2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
34 * Current phase we are in for a intersection operation.
36 enum IntersectionOperationPhase
39 * We get our tunnel but received no message as of now
43 * We expect a BF + the number of the other peers elements
47 * The protocol is over.
48 * Results may still have to be sent to the client.
55 * State of an evaluate operation
61 * The bf we currently receive
63 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
66 * BF of the set's element.
68 struct GNUNET_CONTAINER_BloomFilter *local_bf;
71 * Current state of the operation.
73 enum IntersectionOperationPhase phase;
76 * Generation in which the operation handle
79 unsigned int generation_created;
82 * Maps element-id-hashes to 'elements in our set'.
84 struct GNUNET_CONTAINER_MultiHashMap *contained_elements;
87 * Current element count contained within contained_elements
89 uint32_t contained_elements_count;
92 * Iterator for sending elements on the key to element mapping to the client.
94 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
97 * Evaluate operations are held in
100 struct OperationState *next;
103 * Evaluate operations are held in
106 struct OperationState *prev;
109 * Did we send the client that we are done?
111 int client_done_sent;
117 * Destroy a intersection operation, and free all resources
118 * associated with it.
120 * @param eo the intersection operation to destroy
123 intersection_operation_destroy (struct OperationState *eo)
125 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
126 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
127 eo->set->state->ops_tail,
131 GNUNET_MQ_destroy (eo->mq);
134 if (NULL != eo->tunnel)
136 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
138 GNUNET_MESH_tunnel_destroy (t);
140 // TODO: destroy set elements?
141 if (NULL != eo->spec)
143 if (NULL != eo->spec->context_msg)
145 GNUNET_free (eo->spec->context_msg);
146 eo->spec->context_msg = NULL;
148 GNUNET_free (eo->spec);
153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
155 /* FIXME: do a garbage collection of the set generations */
160 * Inform the client that the intersection operation has failed,
161 * and proceed to destroy the evaluate operation.
163 * @param eo the intersection operation to fail
166 fail_intersection_operation (struct OperationState *eo)
168 struct GNUNET_MQ_Envelope *ev;
169 struct GNUNET_SET_ResultMessage *msg;
171 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
172 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
173 msg->request_id = htonl (eo->spec->client_request_id);
174 msg->element_type = htons (0);
175 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
176 intersection_operation_destroy (eo);
181 * Send a request for the evaluate operation to a remote peer
183 * @param eo operation with the other peer
186 send_operation_request (struct Operation *op)
188 struct GNUNET_MQ_Envelope *ev;
189 struct OperationRequestMessage *msg;
191 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
192 op->spec->context_msg);
196 /* the context message is too large */
198 GNUNET_SERVER_client_disconnect (op->spec->set->client);
201 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
202 msg->app_id = op->spec->app_id;
203 msg->salt = htonl (op->spec->salt);
204 GNUNET_MQ_send (op->mq, ev);
206 if (NULL != op->spec->context_msg)
207 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
211 if (NULL != op->spec->context_msg)
213 GNUNET_free (op->spec->context_msg);
214 op->spec->context_msg = NULL;
221 * Handle an BF message from a remote peer.
223 * @param cls the intersection operation
224 * @param mh the header of the message
227 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
229 struct OperationState *eo = cls;
230 struct BFMessage *msg = (struct BFMessage *) mh;
231 unsigned int buckets_in_message;
233 if (eo->phase == PHASE_EXPECT_INITIAL )
235 eo->phase = PHASE_BF_EXCHANGE;
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n", 1<<msg->order);
239 // if (the remote peer has less elements than us)
240 // run our elements through his bloomfilter
241 // else if (we have the same elements)
244 // evict elements we can exclude through the bloomfilter
246 // create a new bloomfilter over our remaining elements
248 // send our new count and the bloomfilter back
250 else if (eo->phase == PHASE_BF_EXCHANGE)
259 * Send a result message to the client indicating
260 * that there is a new element.
262 * @param eo intersection operation
263 * @param element element to send
266 send_client_element (struct OperationState *eo,
267 struct GNUNET_SET_Element *element)
269 struct GNUNET_MQ_Envelope *ev;
270 struct GNUNET_SET_ResultMessage *rm;
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
273 GNUNET_assert (0 != eo->spec->client_request_id);
274 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
277 GNUNET_MQ_discard (ev);
281 rm->result_status = htons (GNUNET_SET_STATUS_OK);
282 rm->request_id = htonl (eo->spec->client_request_id);
283 rm->element_type = element->type;
284 memcpy (&rm[1], element->data, element->size);
285 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
290 * Send a result message to the client indicating
291 * that the operation is over.
292 * After the result done message has been sent to the client,
293 * destroy the evaluate operation.
295 * @param eo intersection operation
298 send_client_done_and_destroy (struct OperationState *eo)
300 struct GNUNET_MQ_Envelope *ev;
301 struct GNUNET_SET_ResultMessage *rm;
303 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
305 eo->client_done_sent = GNUNET_YES;
307 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
308 rm->request_id = htonl (eo->spec->client_request_id);
309 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
310 rm->element_type = htons (0);
311 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
313 intersection_operation_destroy (eo);
318 * Send a bloomfilter to our peer.
319 * that the operation is over.
320 * After the result done message has been sent to the client,
321 * destroy the evaluate operation.
323 * @param eo intersection operation
326 send_bloomfilter (struct Operation *op)
328 struct BloomFilter *bf;
329 struct GNUNET_MQ_Envelope *ev;
330 struct BFMessage *msg;
333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n", );
335 ev = GNUNET_MQ_msg(msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
337 msg->sender_mutator = htonl (op->spec->salt);
338 msg->sender_element_count = htonl (op->state->contained_elements_count);
339 GNUNET_assert(GNUNET_SYSERR != GNUNET_CONTAINER_bloomfilter_get_raw_data(
342 GNUNET_CRYPTO_HASH_LENGTH));
344 GNUNET_MQ_send (op->mq, ev);
346 op->state->phase = PHASE_BF_EXCHANGE;
350 * Handle a done message from a remote peer
352 * @param cls the intersection operation
353 * @param mh the message
356 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
358 struct OperationState *eo = cls;
359 struct GNUNET_MQ_Envelope *ev;
361 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
363 /* we got all requests, but still have to send our elements as response */
365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
366 eo->phase = PHASE_FINISHED;
367 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
368 GNUNET_MQ_send (eo->mq, ev);
371 if (eo->phase == PHASE_EXPECT_ELEMENTS)
373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
374 eo->phase = PHASE_FINISHED;
375 send_client_done_and_destroy (eo);
379 fail_intersection_operation (eo);
384 * Evaluate a union operation with
387 * @param op operation to evaluate
390 intersection_evaluate (struct Operation *op)
392 op->state = GNUNET_new (struct OperationState);
393 /* we started the operation, thus we have to send the operation request */
394 op->state->phase = PHASE_BF_EXCHANGE;
395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
396 send_operation_request (op);
403 * fills the contained-elements hashmap with all relevant
404 * elements and adds their mutated hashes to our local bloomfilter with mutator+1
407 * @param key current key code
408 * @param value value in the hash map
409 * @return #GNUNET_YES if we should continue to
414 intersection_iterator_set_to_contained_alice (void *cls,
415 const struct GNUNET_HashCode *key,
417 struct ElementEntry *ee = value;
418 struct Operation *op = cls;
419 struct GNUNET_HashCode mutated_hash;
421 //only consider this element, if it is valid for us
422 if ((op->generation_created >= ee->generation_removed)
423 || (op->generation_created < ee->generation_added))
426 // not contained according to bob's bloomfilter
427 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
428 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
432 op->state->contained_elements_count++;
433 GNUNET_CONTAINER_multihashmap_put (op->state->contained_elements,
434 &ee->element_hash, ee,
435 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
437 // create our own bloomfilter with salt+1
438 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt+1, &mutated_hash);
439 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
448 * fills the contained-elements hashmap with all relevant
449 * elements and adds their mutated hashes to our local bloomfilter
452 * @param key current key code
453 * @param value value in the hash map
454 * @return #GNUNET_YES if we should continue to
459 intersection_iterator_set_to_contained_bob (void *cls,
460 const struct GNUNET_HashCode *key,
462 struct ElementEntry *ee = value;
463 struct Operation *op = cls;
464 struct GNUNET_HashCode mutated_hash;
466 //only consider this element, if it is valid for us
467 if ((op->generation_created >= ee->generation_removed)
468 || (op->generation_created < ee->generation_added))
471 GNUNET_CONTAINER_multihashmap_put (op->state->contained_elements,
472 &ee->element_hash, ee,
473 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
475 op->state->contained_elements_count++;
477 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
479 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
486 * removes element from a hashmap if it is not contained within the
487 * provided remote bloomfilter.
490 * @param key current key code
491 * @param value value in the hash map
492 * @return #GNUNET_YES if we should continue to
497 intersection_iterator_element_removal (void *cls,
498 const struct GNUNET_HashCode *key,
500 struct ElementEntry *ee = value;
501 struct Operation *op = cls;
502 struct GNUNET_HashCode mutated_hash;
504 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
506 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
508 op->state->contained_elements_count--;
509 GNUNET_CONTAINER_multihashmap_remove (op->state->contained_elements,
518 * removes element from a hashmap if it is not contained within the
519 * provided remote bloomfilter.
522 * @param key current key code
523 * @param value value in the hash map
524 * @return #GNUNET_YES if we should continue to
529 intersection_iterator_create_bf (void *cls,
530 const struct GNUNET_HashCode *key,
532 struct ElementEntry *ee = value;
533 struct Operation *op = cls;
534 struct GNUNET_HashCode mutated_hash;
536 GNUNET_BLOCK_mingle_hash(&ee->element_hash, op->spec->salt, &mutated_hash);
538 GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
546 * Accept an union operation request from a remote peer.
547 * Only initializes the private operation state.
549 * @param op operation that will be accepted as a union operation
552 intersection_accept (struct Operation *op)
554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
555 op->state = GNUNET_new (struct OperationState);
557 op->state->contained_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
558 op->state-> = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
560 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init(NULL, , GNUNET_CONSTANTS_BLOOMFILTER_K);
562 GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements,
563 &intersection_iterator_set_to_contained_bob,
565 /* kick off the operation */
566 send_bloomfilter (op);
571 * Create a new set supporting the intersection operation
573 * @return the newly created set
575 static struct SetState *
576 intersection_set_create (void)
578 struct SetState *set_state;
580 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
582 set_state = GNUNET_new (struct SetState);
589 * Add the element from the given element message to the set.
591 * @param set_state state of the set want to add to
592 * @param ee the element to add to the set
595 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
602 * Destroy a set that supports the intersection operation
604 * @param set_state the set to destroy
607 intersection_set_destroy (struct SetState *set_state)
609 GNUNET_free (set_state);
614 * Remove the element given in the element message from the set.
616 * @param set_state state of the set to remove from
617 * @param element set element to remove
620 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
627 * Dispatch messages for a intersection operation.
629 * @param eo the state of the intersection evaluate operation
630 * @param mh the received message
631 * @return GNUNET_SYSERR if the tunnel should be disconnected,
632 * GNUNET_OK otherwise
635 intersection_handle_p2p_message (struct OperationState *eo,
636 const struct GNUNET_MessageHeader *mh)
638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
639 ntohs (mh->type), ntohs (mh->size));
640 switch (ntohs (mh->type))
642 /* this message handler is not active until after we received an
643 * operation request message, thus the ops request is not handled here
645 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
646 handle_p2p_bf (eo, mh);
648 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
649 handle_p2p_done (eo, mh);
652 /* something wrong with mesh's message handlers? */
659 * Signal to the client that the operation has finished and
660 * destroy the operation.
662 * @param cls operation to destroy
665 send_done_and_destroy (void *cls)
667 struct Operation *op = cls;
668 struct GNUNET_MQ_Envelope *ev;
669 struct GNUNET_SET_ResultMessage *rm;
670 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
671 rm->request_id = htonl (op->spec->client_request_id);
672 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
673 rm->element_type = htons (0);
674 GNUNET_MQ_send (op->spec->set->client_mq, ev);
675 _GSS_operation_destroy (op);
679 * Send a result message to the client indicating
680 * that the operation is over.
681 * After the result done message has been sent to the client,
682 * destroy the evaluate operation.
684 * @param op union operation
687 finish_and_destroy (struct Operation *op)
689 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
691 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
693 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
694 GNUNET_assert (NULL == op->state->full_result_iter);
695 op->state->full_result_iter =
696 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->contained_elements);
699 send_done_and_destroy (op);
703 * handler for peer-disconnects, notifies the client about the aborted operation
705 * @param op the destroyed operation
708 intersection_peer_disconnect (struct Operation *op)
710 if (PHASE_FINISHED != op->state->phase)
712 struct GNUNET_MQ_Envelope *ev;
713 struct GNUNET_SET_ResultMessage *msg;
715 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
716 msg->request_id = htonl (op->spec->client_request_id);
717 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
718 msg->element_type = htons (0);
719 GNUNET_MQ_send (op->spec->set->client_mq, ev);
720 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
721 _GSS_operation_destroy (op);
724 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
725 if (GNUNET_NO == op->state->client_done_sent)
726 finish_and_destroy (op);
731 * Destroy the union operation. Only things specific to the union operation are destroyed.
733 * @param op union operation to destroy
736 intersection_op_cancel (struct Operation *op)
738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
739 /* check if the op was canceled twice */
740 GNUNET_assert (NULL != op->state);
741 if (NULL != op->state->remote_bf)
743 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
744 op->state->remote_bf = NULL;
746 if (NULL != op->state->local_bf)
748 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
749 op->state->local_bf = NULL;
751 if (NULL != op->state->contained_elements)
753 // no need to free the elements, they are still part of the set
754 GNUNET_CONTAINER_multihashmap_destroy (op->state->contained_elements);
755 op->state->contained_elements = NULL;
757 GNUNET_free (op->state);
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
763 _GSS_intersection_vt ()
765 static const struct SetVT intersection_vt = {
766 .create = &intersection_set_create,
767 .msg_handler = &intersection_handle_p2p_message,
768 .add = &intersection_add,
769 .remove = &intersection_remove,
770 .destroy_set = &intersection_set_destroy,
771 .evaluate = &intersection_evaluate,
772 .accept = &intersection_accept,
773 .peer_disconnect = &intersection_peer_disconnect,
774 .cancel = &intersection_op_cancel,
777 return &intersection_vt;