2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection
24 * @author Christian Fuchs
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
29 #include "strata_estimator.h"
30 #include "set_protocol.h"
35 * Number of IBFs in a strata estimator.
37 #define SE_STRATA_COUNT 32
39 * Size of the IBFs in the strata estimator.
41 #define SE_IBF_SIZE 80
43 * hash num parameter for the difference digests and strata estimators
45 #define SE_IBF_HASH_NUM 4
48 * Number of buckets that can be transmitted in one message.
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
53 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54 * Choose this value so that computing the IBF is still cheaper
55 * than transmitting all values.
57 #define MAX_IBF_ORDER (16)
60 * Number of buckets used in the ibf per estimated
67 * Current phase we are in for a intersection operation.
69 enum IntersectionOperationPhase
72 * We sent the request message, and expect a BF
76 * We sent the request message, and expect a BF
80 * The protocol is over.
81 * Results may still have to be sent to the client.
88 * State of an evaluate operation
94 * Tunnel to the remote peer.
96 struct GNUNET_MESH_Tunnel *tunnel;
99 * Detail information about the set operation,
100 * including the set to use.
102 struct OperationSpecification *spec;
105 * Message queue for the peer.
107 struct GNUNET_MQ_Handle *mq;
110 * The bf we currently receive
112 struct BloomFilter *remote_bf;
115 * BF of the set's element.
117 struct BloomFilter *local_bf;
120 * Current state of the operation.
122 enum IntersectionOperationPhase phase;
125 * Generation in which the operation handle
128 unsigned int generation_created;
131 * Set state of the set that this operation
137 * Evaluate operations are held in
140 struct OperationState *next;
143 * Evaluate operations are held in
146 struct OperationState *prev;
149 * Did we send the client that we are done?
151 int client_done_sent;
156 * The key entry is used to associate an ibf key with
162 * IBF key for the entry, derived from the current salt.
164 struct IBF_Key ibf_key;
167 * The actual element associated with the key
169 struct ElementEntry *element;
172 * Element that collides with this element
175 struct KeyEntry *next_colliding;
180 * Used as a closure for sending elements
181 * with a specific IBF key.
183 struct SendElementClosure
186 * The IBF key whose matching elements should be
189 struct IBF_Key ibf_key;
192 * Operation for which the elements
195 struct OperationState *eo;
200 * Extra state required for efficient set intersection.
205 * The strata estimator is only generated once for
207 * The IBF keys are derived from the element hashes with
210 struct StrataEstimator *se;
213 * Evaluate operations are held in
216 struct OperationState *ops_head;
219 * Evaluate operations are held in
222 struct OperationState *ops_tail;
227 * Iterator over hash map entries.
230 * @param key current key code
231 * @param value value in the hash map
232 * @return GNUNET_YES if we should continue to
237 destroy_key_to_element_iter (void *cls,
241 struct KeyEntry *k = value;
245 struct KeyEntry *k_tmp = k;
246 k = k->next_colliding;
247 if (GNUNET_YES == k_tmp->element->remote)
249 GNUNET_free (k_tmp->element);
250 k_tmp->element = NULL;
259 * Destroy a intersection operation, and free all resources
260 * associated with it.
262 * @param eo the intersection operation to destroy
265 intersection_operation_destroy (struct OperationState *eo)
267 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
268 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
269 eo->set->state->ops_tail,
273 GNUNET_MQ_destroy (eo->mq);
276 if (NULL != eo->tunnel)
278 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
280 GNUNET_MESH_tunnel_destroy (t);
282 // TODO: destroy set elements?
283 if (NULL != eo->spec)
285 if (NULL != eo->spec->context_msg)
287 GNUNET_free (eo->spec->context_msg);
288 eo->spec->context_msg = NULL;
290 GNUNET_free (eo->spec);
295 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
297 /* FIXME: do a garbage collection of the set generations */
302 * Inform the client that the intersection operation has failed,
303 * and proceed to destroy the evaluate operation.
305 * @param eo the intersection operation to fail
308 fail_intersection_operation (struct OperationState *eo)
310 struct GNUNET_MQ_Envelope *ev;
311 struct GNUNET_SET_ResultMessage *msg;
313 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
314 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
315 msg->request_id = htonl (eo->spec->client_request_id);
316 msg->element_type = htons (0);
317 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
318 intersection_operation_destroy (eo);
323 * Derive the IBF key from a hash code and
326 * @param src the hash code
327 * @param salt salt to use
328 * @return the derived IBF key
330 static struct IBF_Key
331 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
335 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
336 GCRY_MD_SHA512, GCRY_MD_SHA256,
338 &salt, sizeof (salt),
345 * Send a request for the evaluate operation to a remote peer
347 * @param eo operation with the other peer
350 send_operation_request (struct OperationState *eo)
352 struct GNUNET_MQ_Envelope *ev;
353 struct OperationRequestMessage *msg;
355 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
356 eo->spec->context_msg);
360 /* the context message is too large */
362 GNUNET_SERVER_client_disconnect (eo->spec->set->client);
365 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
366 msg->app_id = eo->spec->app_id;
367 msg->salt = htonl (eo->spec->salt);
368 GNUNET_MQ_send (eo->mq, ev);
370 if (NULL != eo->spec->context_msg)
371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
375 if (NULL != eo->spec->context_msg)
377 GNUNET_free (eo->spec->context_msg);
378 eo->spec->context_msg = NULL;
385 * Iterator to create the mapping between ibf keys
386 * and element entries.
389 * @param key current key code
390 * @param value value in the hash map
391 * @return GNUNET_YES if we should continue to
396 op_register_element_iterator (void *cls,
400 struct KeyEntry *const new_k = cls;
401 struct KeyEntry *old_k = value;
403 GNUNET_assert (NULL != old_k);
406 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
408 new_k->next_colliding = old_k->next_colliding;
409 old_k->next_colliding = new_k;
412 old_k = old_k->next_colliding;
413 } while (NULL != old_k);
419 * Insert an element into the intersection operation's
420 * key-to-element mapping. Takes ownership of 'ee'.
421 * Note that this does not insert the element in the set,
422 * only in the operation's key-element mapping.
423 * This is done to speed up re-tried operations, if some elements
424 * were transmitted, and then the IBF fails to decode.
426 * @param eo the intersection operation
427 * @param ee the element entry
430 op_register_element (struct OperationState *eo, struct ElementEntry *ee)
433 struct IBF_Key ibf_key;
436 ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
437 k = GNUNET_new (struct KeyEntry);
439 k->ibf_key = ibf_key;
440 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
441 (uint32_t) ibf_key.key_val,
442 op_register_element_iterator, k);
444 /* was the element inserted into a colliding bucket? */
445 if (GNUNET_SYSERR == ret)
448 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
449 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
455 * Iterator for initializing the
456 * key-to-element mapping of a intersection operation
458 * @param cls the intersection operation
460 * @param value the element entry to insert
461 * into the key-to-element mapping
462 * @return GNUNET_YES to continue iterating,
466 init_key_to_element_iterator (void *cls,
467 const struct GNUNET_HashCode *key,
470 struct OperationState *eo = cls;
471 struct ElementEntry *e = value;
473 /* make sure that the element belongs to the set at the time
474 * of creating the operation */
475 if ( (e->generation_added > eo->generation_created) ||
476 ( (GNUNET_YES == e->removed) &&
477 (e->generation_removed < eo->generation_created)))
480 GNUNET_assert (GNUNET_NO == e->remote);
482 op_register_element (eo, e);
487 * Handle an IBF message from a remote peer.
489 * @param cls the intersection operation
490 * @param mh the header of the message
493 handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
495 struct OperationState *eo = cls;
496 struct BFMessage *msg = (struct BFMessage *) mh;
497 unsigned int buckets_in_message;
499 if (eo->phase == PHASE_EXPECT_INITIAL )
501 eo->phase = PHASE_BF_EXCHANGE;
503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n", 1<<msg->order);
505 // if (the remote peer has less elements than us)
506 // run our elements through his bloomfilter
507 // else if (we have the same elements)
510 // evict elements we can exclude through the bloomfilter
512 // create a new bloomfilter over our remaining elements
514 // send our new count and the bloomfilter back
516 else if (eo->phase == PHASE_BF_EXCHANGE)
525 * Send a result message to the client indicating
526 * that there is a new element.
528 * @param eo intersection operation
529 * @param element element to send
532 send_client_element (struct OperationState *eo,
533 struct GNUNET_SET_Element *element)
535 struct GNUNET_MQ_Envelope *ev;
536 struct GNUNET_SET_ResultMessage *rm;
538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
539 GNUNET_assert (0 != eo->spec->client_request_id);
540 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
543 GNUNET_MQ_discard (ev);
547 rm->result_status = htons (GNUNET_SET_STATUS_OK);
548 rm->request_id = htonl (eo->spec->client_request_id);
549 rm->element_type = element->type;
550 memcpy (&rm[1], element->data, element->size);
551 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
556 * Send a result message to the client indicating
557 * that the operation is over.
558 * After the result done message has been sent to the client,
559 * destroy the evaluate operation.
561 * @param eo intersection operation
564 send_client_done_and_destroy (struct OperationState *eo)
566 struct GNUNET_MQ_Envelope *ev;
567 struct GNUNET_SET_ResultMessage *rm;
569 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
571 eo->client_done_sent = GNUNET_YES;
573 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
574 rm->request_id = htonl (eo->spec->client_request_id);
575 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
576 rm->element_type = htons (0);
577 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
579 intersection_operation_destroy (eo);
584 * Handle a done message from a remote peer
586 * @param cls the intersection operation
587 * @param mh the message
590 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
592 struct OperationState *eo = cls;
593 struct GNUNET_MQ_Envelope *ev;
595 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
597 /* we got all requests, but still have to send our elements as response */
599 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
600 eo->phase = PHASE_FINISHED;
601 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
602 GNUNET_MQ_send (eo->mq, ev);
605 if (eo->phase == PHASE_EXPECT_ELEMENTS)
607 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
608 eo->phase = PHASE_FINISHED;
609 send_client_done_and_destroy (eo);
613 fail_intersection_operation (eo);
618 * Evaluate a intersection operation with
621 * @param spec specification of the operation the evaluate
622 * @param tunnel tunnel already connected to the partner peer
623 * @param tc tunnel context, passed here so all new incoming
624 * messages are directly going to the intersection operations
625 * @return a handle to the operation
628 intersection_evaluate (struct OperationSpecification *spec,
629 struct GNUNET_MESH_Tunnel *tunnel,
630 struct TunnelContext *tc)
632 struct OperationState *eo;
634 eo = GNUNET_new (struct OperationState);
635 tc->vt = _GSS_intersection_vt ();
637 eo->generation_created = spec->set->current_generation++;
641 eo->mq = GNUNET_MESH_mq_create (tunnel);
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644 "evaluating intersection operation, (app %s)\n",
645 GNUNET_h2s (&eo->spec->app_id));
647 /* we started the operation, thus we have to send the operation request */
648 eo->phase = PHASE_EXPECT_SE;
650 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
651 eo->set->state->ops_tail,
654 send_initial_bloomfilter (eo);
659 * Accept an intersection operation request from a remote peer
661 * @param spec all necessary information about the operation
662 * @param tunnel open tunnel to the partner's peer
663 * @param tc tunnel context, passed here so all new incoming
664 * messages are directly going to the intersection operations
668 intersection_accept (struct OperationSpecification *spec,
669 struct GNUNET_MESH_Tunnel *tunnel,
670 struct TunnelContext *tc)
672 struct OperationState *eo;
674 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set intersection operation\n");
676 eo = GNUNET_new (struct OperationState);
677 tc->vt = _GSS_intersection_vt ();
680 eo->generation_created = eo->set->current_generation++;
683 eo->mq = GNUNET_MESH_mq_create (tunnel);
684 /* transfer ownership of mq and socket from incoming to eo */
685 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
686 eo->set->state->ops_tail,
688 /* kick off the operation */
689 send_bloomfilter (eo);
694 * Create a new set supporting the intersection operation
696 * @return the newly created set
698 static struct SetState *
699 intersection_set_create (void)
701 struct SetState *set_state;
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
705 set_state = GNUNET_new (struct SetState);
707 //TODO: actually create that thing
714 * Add the element from the given element message to the set.
716 * @param set_state state of the set want to add to
717 * @param ee the element to add to the set
720 intersection_add (struct SetState *set_state, struct ElementEntry *ee)
727 * Destroy a set that supports the intersection operation
729 * @param set_state the set to destroy
732 intersection_set_destroy (struct SetState *set_state)
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection set\n");
735 /* important to destroy operations before the rest of the set */
736 while (NULL != set_state->ops_head)
737 intersection_operation_destroy (set_state->ops_head);
738 if (NULL != set_state->se)
740 //TODO: actually destroy that thing
741 set_state->se = NULL;
743 GNUNET_free (set_state);
748 * Remove the element given in the element message from the set.
750 * @param set_state state of the set to remove from
751 * @param element set element to remove
754 intersection_remove (struct SetState *set_state, struct ElementEntry *element)
761 * Dispatch messages for a intersection operation.
763 * @param eo the state of the intersection evaluate operation
764 * @param mh the received message
765 * @return GNUNET_SYSERR if the tunnel should be disconnected,
766 * GNUNET_OK otherwise
769 intersection_handle_p2p_message (struct OperationState *eo,
770 const struct GNUNET_MessageHeader *mh)
772 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
773 ntohs (mh->type), ntohs (mh->size));
774 switch (ntohs (mh->type))
776 case GNUNET_MESSAGE_TYPE_SET_P2P_BF:
777 handle_p2p_bf (eo, mh);
779 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
780 handle_p2p_done (eo, mh);
783 /* something wrong with mesh's message handlers? */
791 intersection_peer_disconnect (struct OperationState *op)
793 /* Are we already disconnected? */
794 if (NULL == op->tunnel)
799 GNUNET_MQ_destroy (op->mq);
802 if (PHASE_FINISHED != op->phase)
804 struct GNUNET_MQ_Envelope *ev;
805 struct GNUNET_SET_ResultMessage *msg;
807 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
808 msg->request_id = htonl (op->spec->client_request_id);
809 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
810 msg->element_type = htons (0);
811 GNUNET_MQ_send (op->spec->set->client_mq, ev);
812 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
813 intersection_operation_destroy (op);
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
817 if (GNUNET_NO == op->client_done_sent)
818 send_client_done_and_destroy (op);
823 intersection_op_cancel (struct SetState *set_state, uint32_t op_id)
825 /* FIXME: implement */
830 _GSS_intersection_vt ()
832 static const struct SetVT intersection_vt = {
833 .create = &intersection_set_create,
834 .msg_handler = &intersection_handle_p2p_message,
835 .add = &intersection_add,
836 .remove = &intersection_remove,
837 .destroy_set = &intersection_set_destroy,
838 .evaluate = &intersection_evaluate,
839 .accept = &intersection_accept,
840 .peer_disconnect = &intersection_peer_disconnect,
841 .cancel = &intersection_op_cancel,
844 return &intersection_vt;