2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
34 * Current phase we are in for a intersection operation.
36 enum IntersectionOperationPhase
39 * We get our tunnel but received no message as of now
43 * We expect a BF + the number of the other peers elements
47 * The protocol is over.
48 * Results may still have to be sent to the client.
55 * State of an evaluate operation
61 * Tunnel to the remote peer.
63 struct GNUNET_MESH_Tunnel *tunnel;
66 * Detail information about the set operation,
67 * including the set to use.
69 struct OperationSpecification *spec;
72 * Message queue for the peer.
74 struct GNUNET_MQ_Handle *mq;
77 * The bf we currently receive
79 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
82 * BF of the set's element.
84 struct GNUNET_CONTAINER_BloomFilter *local_bf;
87 * Current state of the operation.
89 enum IntersectionOperationPhase phase;
92 * Generation in which the operation handle
95 unsigned int generation_created;
98 * Set state of the set that this operation
104 * Maps element-id-hashes to 'elements in our set'.
106 struct GNUNET_CONTAINER_MultiHashMap *contained_elements;
109 * Current element count contained within contained_elements
111 uint64_t contained_elements_count;
114 * Iterator for sending elements on the key to element mapping to the client.
116 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
119 * Evaluate operations are held in
122 struct OperationState *next;
125 * Evaluate operations are held in
128 struct OperationState *prev;
131 * Did we send the client that we are done?
133 int client_done_sent;
138 * Extra state required for efficient set intersection.
143 * Evaluate operations are held in
146 struct OperationState *ops_head;
149 * Evaluate operations are held in
152 struct OperationState *ops_tail;
157 * Destroy a intersection operation, and free all resources
158 * associated with it.
160 * @param eo the intersection operation to destroy
163 intersection_operation_destroy (struct OperationState *eo)
165 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
166 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
167 eo->set->state->ops_tail,
171 GNUNET_MQ_destroy (eo->mq);
174 if (NULL != eo->tunnel)
176 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
178 GNUNET_MESH_tunnel_destroy (t);
180 // TODO: destroy set elements?
181 if (NULL != eo->spec)
183 if (NULL != eo->spec->context_msg)
185 GNUNET_free (eo->spec->context_msg);
186 eo->spec->context_msg = NULL;
188 GNUNET_free (eo->spec);
193 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
195 /* FIXME: do a garbage collection of the set generations */
200 * Inform the client that the intersection operation has failed,
201 * and proceed to destroy the evaluate operation.
203 * @param eo the intersection operation to fail
206 fail_intersection_operation (struct OperationState *eo)
208 struct GNUNET_MQ_Envelope *ev;
209 struct GNUNET_SET_ResultMessage *msg;
211 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
212 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
213 msg->request_id = htonl (eo->spec->client_request_id);
214 msg->element_type = htons (0);
215 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
216 intersection_operation_destroy (eo);
221 * Send a request for the evaluate operation to a remote peer
223 * @param eo operation with the other peer
226 send_operation_request (struct Operation *op)
228 struct GNUNET_MQ_Envelope *ev;
229 struct OperationRequestMessage *msg;
231 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
232 op->spec->context_msg);
236 /* the context message is too large */
238 GNUNET_SERVER_client_disconnect (op->spec->set->client);
241 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
242 msg->app_id = op->spec->app_id;
243 msg->salt = htonl (op->spec->salt);
244 GNUNET_MQ_send (op->mq, ev);
246 if (NULL != op->spec->context_msg)
247 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
251 if (NULL != op->spec->context_msg)
253 GNUNET_free (op->spec->context_msg);
254 op->spec->context_msg = NULL;
261 * Handle an BF message from a remote peer.
263 * @param cls the intersection operation
264 * @param mh the header of the message
267 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
269 struct OperationState *eo = cls;
270 struct BFMessage *msg = (struct BFMessage *) mh;
271 unsigned int buckets_in_message;
273 if (eo->phase == PHASE_EXPECT_INITIAL )
275 eo->phase = PHASE_BF_EXCHANGE;
277 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n", 1<<msg->order);
279 // if (the remote peer has less elements than us)
280 // run our elements through his bloomfilter
281 // else if (we have the same elements)
284 // evict elements we can exclude through the bloomfilter
286 // create a new bloomfilter over our remaining elements
288 // send our new count and the bloomfilter back
290 else if (eo->phase == PHASE_BF_EXCHANGE)
299 * Send a result message to the client indicating
300 * that there is a new element.
302 * @param eo intersection operation
303 * @param element element to send
306 send_client_element (struct OperationState *eo,
307 struct GNUNET_SET_Element *element)
309 struct GNUNET_MQ_Envelope *ev;
310 struct GNUNET_SET_ResultMessage *rm;
312 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
313 GNUNET_assert (0 != eo->spec->client_request_id);
314 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
317 GNUNET_MQ_discard (ev);
321 rm->result_status = htons (GNUNET_SET_STATUS_OK);
322 rm->request_id = htonl (eo->spec->client_request_id);
323 rm->element_type = element->type;
324 memcpy (&rm[1], element->data, element->size);
325 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
330 * Send a result message to the client indicating
331 * that the operation is over.
332 * After the result done message has been sent to the client,
333 * destroy the evaluate operation.
335 * @param eo intersection operation
338 send_client_done_and_destroy (struct OperationState *eo)
340 struct GNUNET_MQ_Envelope *ev;
341 struct GNUNET_SET_ResultMessage *rm;
343 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
345 eo->client_done_sent = GNUNET_YES;
347 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
348 rm->request_id = htonl (eo->spec->client_request_id);
349 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
350 rm->element_type = htons (0);
351 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
353 intersection_operation_destroy (eo);
357 * Send a bloomfilter to our peer.
358 * that the operation is over.
359 * After the result done message has been sent to the client,
360 * destroy the evaluate operation.
362 * @param eo intersection operation
365 send_bloomfilter (struct Operation *op){
366 //get number of all elements still in the set
368 // send the bloomfilter
369 unsigned int buckets_sent = 0;
370 struct BloomFilter *bf;
372 // add all our elements to the bloomfilter
373 // create new bloomfilter for all our elements & count elements
374 //GNUNET_CONTAINER_multihashmap32_remove
375 //eo->local_bf = GNUNET_CONTAINER_multihashmap32_iterate(eo->set->elements, add);
379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n", 1<<ibf_order);
383 while (buckets_sent < (1 << bf_order))
385 unsigned int buckets_in_message;
386 struct GNUNET_MQ_Envelope *ev;
387 struct IBFMessage *msg;
389 buckets_in_message = (1 << bf_order) - buckets_sent;
390 /* limit to maximum */
391 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
392 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
394 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
395 GNUNET_MESSAGE_TYPE_SET_P2P_BF);
397 msg->order = bf_order;
398 msg->offset = htons (buckets_sent);
399 ibf_write_slice (ibf, buckets_sent,
400 buckets_in_message, &msg[1]);
401 buckets_sent += buckets_in_message;
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
403 buckets_in_message, buckets_sent, 1<<ibf_order);
404 GNUNET_MQ_send (eo->mq, ev);
407 eo->phase = PHASE_BF_EXCHANGE;
411 * Handle a done message from a remote peer
413 * @param cls the intersection operation
414 * @param mh the message
417 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
419 struct OperationState *eo = cls;
420 struct GNUNET_MQ_Envelope *ev;
422 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
424 /* we got all requests, but still have to send our elements as response */
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
427 eo->phase = PHASE_FINISHED;
428 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
429 GNUNET_MQ_send (eo->mq, ev);
432 if (eo->phase == PHASE_EXPECT_ELEMENTS)
434 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
435 eo->phase = PHASE_FINISHED;
436 send_client_done_and_destroy (eo);
440 fail_intersection_operation (eo);
445 * Evaluate a union operation with
448 * @param op operation to evaluate
451 intersection_evaluate (struct Operation *op)
453 op->state = GNUNET_new (struct OperationState);
454 /* we started the operation, thus we have to send the operation request */
455 op->state->phase = PHASE_BF_EXCHANGE;
456 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
457 send_operation_request (op);
464 * fills the contained-elements hashmap with all relevant
465 * elements and adds their mutated hashes to our local bloomfilter with mutator+1
468 * @param key current key code
469 * @param value value in the hash map
470 * @return #GNUNET_YES if we should continue to
475 intersection_iterator_set_to_contained_alice (void *cls,
476 const struct GNUNET_HashCode *key,
478 struct ElementEntry *ee = value;
479 struct Operation *op = cls;
480 struct GNUNET_HashCode mutated_hash;
482 //only consider this element, if it is valid for us
483 if ((op->generation_created >= ee->generation_removed)
484 || (op->generation_created < ee->generation_added))
487 // not contained according to bob's bloomfilter
488 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
489 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
493 op->state->contained_elements_count++;
494 GNUNET_CONTAINER_multihashmap_put (op->state->contained_elements,
495 &ee->element_hash, ee,
496 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
498 // create our own bloomfilter with salt+1
499 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
500 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
509 * fills the contained-elements hashmap with all relevant
510 * elements and adds their mutated hashes to our local bloomfilter
513 * @param key current key code
514 * @param value value in the hash map
515 * @return #GNUNET_YES if we should continue to
520 intersection_iterator_set_to_contained_bob (void *cls,
521 const struct GNUNET_HashCode *key,
523 struct ElementEntry *ee = value;
524 struct Operation *op = cls;
525 struct GNUNET_HashCode mutated_hash;
527 //only consider this element, if it is valid for us
528 if ((op->generation_created >= ee->generation_removed)
529 || (op->generation_created < ee->generation_added))
532 GNUNET_CONTAINER_multihashmap_put (op->state->contained_elements,
533 &ee->element_hash, ee,
534 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
536 op->state->contained_elements_count++;
538 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
540 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
547 * removes element from a hashmap if it is not contained within the
548 * provided remote bloomfilter.
551 * @param key current key code
552 * @param value value in the hash map
553 * @return #GNUNET_YES if we should continue to
558 intersection_iterator_element_removal (void *cls,
559 const struct GNUNET_HashCode *key,
561 struct ElementEntry *ee = value;
562 struct Operation *op = cls;
563 struct GNUNET_HashCode mutated_hash;
565 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
567 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
569 op->state->contained_elements_count--;
570 GNUNET_CONTAINER_multihashmap_remove (op->state->contained_elements,
579 * removes element from a hashmap if it is not contained within the
580 * provided remote bloomfilter.
583 * @param key current key code
584 * @param value value in the hash map
585 * @return #GNUNET_YES if we should continue to
590 intersection_iterator_create_bf (void *cls,
591 const struct GNUNET_HashCode *key,
593 struct ElementEntry *ee = value;
594 struct Operation *op = cls;
595 struct GNUNET_HashCode mutated_hash;
597 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
599 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
607 * Accept an union operation request from a remote peer.
608 * Only initializes the private operation state.
610 * @param op operation that will be accepted as a union operation
613 intersection_accept (struct Operation *op)
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
616 op->state = GNUNET_new (struct OperationState);
618 op->state->contained_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
620 GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements,
621 &intersection_iterator_set_to_contained_bob,
625 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init(NULL, sizeof(struct GNUNET_HashCode), GNUNET_CONSTANTS_BLOOMFILTER_K);
627 if (NULL != op->state->remote_bf){
628 // run the set through the remote bloomfilter
635 /* kick off the operation */
636 send_bloomfilter (op);
641 * Create a new set supporting the intersection operation
643 * @return the newly created set
645 static struct SetState *
646 intersection_set_create (void)
648 struct SetState *set_state;
650 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
652 set_state = GNUNET_new (struct SetState);
659 * Add the element from the given element message to the set.
661 * @param set_state state of the set want to add to
662 * @param ee the element to add to the set
665 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
672 * Destroy a set that supports the intersection operation
674 * @param set_state the set to destroy
677 intersection_set_destroy (struct SetState *set_state)
679 GNUNET_free (set_state);
684 * Remove the element given in the element message from the set.
686 * @param set_state state of the set to remove from
687 * @param element set element to remove
690 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
697 * Dispatch messages for a intersection operation.
699 * @param eo the state of the intersection evaluate operation
700 * @param mh the received message
701 * @return GNUNET_SYSERR if the tunnel should be disconnected,
702 * GNUNET_OK otherwise
705 intersection_handle_p2p_message (struct OperationState *eo,
706 const struct GNUNET_MessageHeader *mh)
708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
709 ntohs (mh->type), ntohs (mh->size));
710 switch (ntohs (mh->type))
712 /* this message handler is not active until after we received an
713 * operation request message, thus the ops request is not handled here
715 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
716 handle_p2p_bf (eo, mh);
718 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
719 handle_p2p_done (eo, mh);
722 /* something wrong with mesh's message handlers? */
729 * Signal to the client that the operation has finished and
730 * destroy the operation.
732 * @param cls operation to destroy
735 send_done_and_destroy (void *cls)
737 struct Operation *op = cls;
738 struct GNUNET_MQ_Envelope *ev;
739 struct GNUNET_SET_ResultMessage *rm;
740 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
741 rm->request_id = htonl (op->spec->client_request_id);
742 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
743 rm->element_type = htons (0);
744 GNUNET_MQ_send (op->spec->set->client_mq, ev);
745 _GSS_operation_destroy (op);
749 * Send a result message to the client indicating
750 * that the operation is over.
751 * After the result done message has been sent to the client,
752 * destroy the evaluate operation.
754 * @param op union operation
757 finish_and_destroy (struct Operation *op)
759 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
761 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
763 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
764 GNUNET_assert (NULL == op->state->full_result_iter);
765 op->state->full_result_iter =
766 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->contained_elements);
769 send_done_and_destroy (op);
774 intersection_peer_disconnect (struct Operation *op)
776 if (PHASE_FINISHED != op->state->phase)
778 struct GNUNET_MQ_Envelope *ev;
779 struct GNUNET_SET_ResultMessage *msg;
781 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
782 msg->request_id = htonl (op->spec->client_request_id);
783 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
784 msg->element_type = htons (0);
785 GNUNET_MQ_send (op->spec->set->client_mq, ev);
786 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
787 _GSS_operation_destroy (op);
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
791 if (GNUNET_NO == op->state->client_done_sent)
792 finish_and_destroy (op);
797 * Destroy the union operation. Only things specific to the union operation are destroyed.
799 * @param op union operation to destroy
802 intersection_op_cancel (struct Operation *op)
804 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
805 /* check if the op was canceled twice */
806 GNUNET_assert (NULL != op->state);
807 if (NULL != op->state->remote_bf)
809 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
810 op->state->remote_bf = NULL;
812 if (NULL != op->state->local_bf)
814 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
815 op->state->local_bf = NULL;
817 if (NULL != op->state->contained_elements)
819 // no need to free the elements, they are still part of the set
820 GNUNET_CONTAINER_multihashmap_destroy (op->state->contained_elements);
821 op->state->contained_elements = NULL;
823 GNUNET_free (op->state);
825 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
829 _GSS_intersection_vt ()
831 static const struct SetVT intersection_vt = {
832 .create = &intersection_set_create,
833 .msg_handler = &intersection_handle_p2p_message,
834 .add = &intersection_add,
835 .remove = &intersection_remove,
836 .destroy_set = &intersection_set_destroy,
837 .evaluate = &intersection_evaluate,
838 .accept = &intersection_accept,
839 .peer_disconnect = &intersection_peer_disconnect,
840 .cancel = &intersection_op_cancel,
843 return &intersection_vt;