2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet-service-set.h"
30 #include "strata_estimator.h"
31 #include "set_protocol.h"
36 * Number of IBFs in a strata estimator.
38 #define SE_STRATA_COUNT 32
40 * Size of the IBFs in the strata estimator.
42 #define SE_IBF_SIZE 80
44 * hash num parameter for the difference digests and strata estimators
46 #define SE_IBF_HASH_NUM 4
49 * Number of buckets that can be transmitted in one message.
51 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
54 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
55 * Choose this value so that computing the IBF is still cheaper
56 * than transmitting all values.
58 #define MAX_IBF_ORDER (16)
61 * Number of buckets used in the ibf per estimated
68 * Current phase we are in for a union operation.
70 enum UnionOperationPhase
73 * We sent the request message, and expect a strata estimator
77 * We sent the strata estimator, and expect an IBF
81 * We know what type of IBF the other peer wants to send us,
82 * and expect the remaining parts
84 PHASE_EXPECT_IBF_CONT,
86 * We are sending request and elements,
87 * and thus only expect elements from the other peer.
89 PHASE_EXPECT_ELEMENTS,
91 * We are expecting elements and requests, and send
92 * requested elements back to the other peer.
94 PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
96 * The protocol is over.
97 * Results may still have to be sent to the client.
104 * State of an evaluate operation
107 struct OperationState
110 * Tunnel to the remote peer.
112 struct GNUNET_MESH_Tunnel *tunnel;
115 * Message queue for the peer.
117 struct GNUNET_MQ_Handle *mq;
120 * Number of ibf buckets received
122 unsigned int ibf_buckets_received;
125 * Copy of the set's strata estimator at the time of
126 * creation of this operation
128 struct StrataEstimator *se;
131 * The ibf we currently receive
133 struct InvertibleBloomFilter *remote_ibf;
136 * IBF of the set's element.
138 struct InvertibleBloomFilter *local_ibf;
141 * Maps IBF-Keys (specific to the current salt) to elements.
142 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
143 * Colliding IBF-Keys are linked.
145 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
148 * Iterator for sending elements on the key to element mapping to the client.
150 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
153 * Current state of the operation.
155 enum UnionOperationPhase phase;
158 * Did we send the client that we are done?
160 int client_done_sent;
165 * The key entry is used to associate an ibf key with
171 * IBF key for the entry, derived from the current salt.
173 struct IBF_Key ibf_key;
176 * The actual element associated with the key.
178 struct ElementEntry *element;
181 * Element that collides with this element
182 * on the ibf key. All colliding entries must have the same ibf key.
184 struct KeyEntry *next_colliding;
189 * Used as a closure for sending elements
190 * with a specific IBF key.
192 struct SendElementClosure
195 * The IBF key whose matching elements should be
198 struct IBF_Key ibf_key;
201 * Operation for which the elements
204 struct Operation *op;
209 * Extra state required for efficient set union.
214 * The strata estimator is only generated once for
216 * The IBF keys are derived from the element hashes with
219 struct StrataEstimator *se;
224 * Iterator over hash map entries.
227 * @param key current key code
228 * @param value value in the hash map
229 * @return #GNUNET_YES if we should continue to
234 destroy_key_to_element_iter (void *cls,
238 struct KeyEntry *k = value;
239 /* destroy the linked list of colliding ibf key entries */
242 struct KeyEntry *k_tmp = k;
243 k = k->next_colliding;
244 if (GNUNET_YES == k_tmp->element->remote)
246 GNUNET_free (k_tmp->element);
247 k_tmp->element = NULL;
256 * Destroy the union operation. Only things specific to the union operation are destroyed.
258 * @param op union operation to destroy
261 union_op_cancel (struct Operation *op)
263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
264 /* check if the op was canceled twice */
265 GNUNET_assert (NULL != op->state);
266 if (NULL != op->state->remote_ibf)
268 ibf_destroy (op->state->remote_ibf);
269 op->state->remote_ibf = NULL;
271 if (NULL != op->state->local_ibf)
273 ibf_destroy (op->state->local_ibf);
274 op->state->local_ibf = NULL;
276 if (NULL != op->state->se)
278 strata_estimator_destroy (op->state->se);
279 op->state->se = NULL;
281 if (NULL != op->state->key_to_element)
283 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
284 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
285 op->state->key_to_element = NULL;
287 GNUNET_free (op->state);
289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
294 * Inform the client that the union operation has failed,
295 * and proceed to destroy the evaluate operation.
297 * @param op the union operation to fail
300 fail_union_operation (struct Operation *op)
302 struct GNUNET_MQ_Envelope *ev;
303 struct GNUNET_SET_ResultMessage *msg;
305 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
307 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
308 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
309 msg->request_id = htonl (op->spec->client_request_id);
310 msg->element_type = htons (0);
311 GNUNET_MQ_send (op->spec->set->client_mq, ev);
312 _GSS_operation_destroy (op);
317 * Derive the IBF key from a hash code and
320 * @param src the hash code
321 * @param salt salt to use
322 * @return the derived IBF key
324 static struct IBF_Key
325 get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
329 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
330 GCRY_MD_SHA512, GCRY_MD_SHA256,
332 &salt, sizeof (salt),
339 * Send a request for the evaluate operation to a remote peer
341 * @param op operation with the other peer
344 send_operation_request (struct Operation *op)
346 struct GNUNET_MQ_Envelope *ev;
347 struct OperationRequestMessage *msg;
349 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
350 op->spec->context_msg);
354 /* the context message is too large */
356 GNUNET_SERVER_client_disconnect (op->spec->set->client);
359 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
360 msg->app_id = op->spec->app_id;
361 msg->salt = htonl (op->spec->salt);
362 GNUNET_MQ_send (op->mq, ev);
364 if (NULL != op->spec->context_msg)
365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
369 if (NULL != op->spec->context_msg)
371 GNUNET_free (op->spec->context_msg);
372 op->spec->context_msg = NULL;
378 * Iterator to create the mapping between ibf keys
379 * and element entries.
382 * @param key current key code
383 * @param value value in the hash map
384 * @return #GNUNET_YES if we should continue to
389 op_register_element_iterator (void *cls,
393 struct KeyEntry *const new_k = cls;
394 struct KeyEntry *old_k = value;
396 GNUNET_assert (NULL != old_k);
397 /* check if our ibf key collides with the ibf key in the existing entry */
398 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
400 /* insert the the new key in the collision chain */
401 new_k->next_colliding = old_k->next_colliding;
402 old_k->next_colliding = new_k;
403 /* signal to the caller that we were able to insert into a colliding bucket */
411 * Iterator to create the mapping between ibf keys
412 * and element entries.
415 * @param key current key code
416 * @param value value in the hash map
417 * @return #GNUNET_YES if we should continue to
422 op_has_element_iterator (void *cls,
426 struct GNUNET_HashCode *element_hash = cls;
427 struct KeyEntry *k = value;
429 GNUNET_assert (NULL != k);
432 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
434 k = k->next_colliding;
441 * Determine whether the given element is already in the operation's element
444 * @param op operation that should be tested for 'element_hash'
445 * @param element_hash hash of the element to look for
446 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
449 op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
452 struct IBF_Key ibf_key;
454 ibf_key = get_ibf_key (element_hash, op->spec->salt);
455 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
456 (uint32_t) ibf_key.key_val,
457 op_has_element_iterator, (void *) element_hash);
459 /* was the iteration aborted because we found the element? */
460 if (GNUNET_SYSERR == ret)
467 * Insert an element into the union operation's
468 * key-to-element mapping. Takes ownership of 'ee'.
469 * Note that this does not insert the element in the set,
470 * only in the operation's key-element mapping.
471 * This is done to speed up re-tried operations, if some elements
472 * were transmitted, and then the IBF fails to decode.
474 * @param op the union operation
475 * @param ee the element entry
478 op_register_element (struct Operation *op, struct ElementEntry *ee)
481 struct IBF_Key ibf_key;
484 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
485 k = GNUNET_new (struct KeyEntry);
487 k->ibf_key = ibf_key;
488 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
489 (uint32_t) ibf_key.key_val,
490 op_register_element_iterator, k);
492 /* was the element inserted into a colliding bucket? */
493 if (GNUNET_SYSERR == ret)
496 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
497 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
502 * Insert a key into an ibf.
506 * @param value the key entry to get the key from
509 prepare_ibf_iterator (void *cls,
513 struct InvertibleBloomFilter *ibf = cls;
514 struct KeyEntry *ke = value;
516 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
518 ibf_insert (ibf, ke->ibf_key);
524 * Iterator for initializing the
525 * key-to-element mapping of a union operation
527 * @param cls the union operation
529 * @param value the element entry to insert
530 * into the key-to-element mapping
531 * @return GNUNET_YES to continue iterating,
535 init_key_to_element_iterator (void *cls,
536 const struct GNUNET_HashCode *key,
539 struct Operation *op = cls;
540 struct ElementEntry *e = value;
542 /* make sure that the element belongs to the set at the time
543 * of creating the operation */
544 if ( (e->generation_added > op->generation_created) ||
545 ( (GNUNET_YES == e->removed) &&
546 (e->generation_removed < op->generation_created)))
549 GNUNET_assert (GNUNET_NO == e->remote);
551 op_register_element (op, e);
557 * Create an ibf with the operation's elements
558 * of the specified size
560 * @param op the union operation
561 * @param size size of the ibf to create
564 prepare_ibf (struct Operation *op, uint16_t size)
566 if (NULL == op->state->key_to_element)
569 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
570 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
571 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
572 init_key_to_element_iterator, op);
574 if (NULL != op->state->local_ibf)
575 ibf_destroy (op->state->local_ibf);
576 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
577 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
578 prepare_ibf_iterator, op->state->local_ibf);
583 * Send an ibf of appropriate size.
585 * @param op the union operation
586 * @param ibf_order order of the ibf to send, size=2^order
589 send_ibf (struct Operation *op, uint16_t ibf_order)
591 unsigned int buckets_sent = 0;
592 struct InvertibleBloomFilter *ibf;
594 prepare_ibf (op, 1<<ibf_order);
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
598 ibf = op->state->local_ibf;
600 while (buckets_sent < (1 << ibf_order))
602 unsigned int buckets_in_message;
603 struct GNUNET_MQ_Envelope *ev;
604 struct IBFMessage *msg;
606 buckets_in_message = (1 << ibf_order) - buckets_sent;
607 /* limit to maximum */
608 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
609 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
611 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
612 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
614 msg->order = ibf_order;
615 msg->offset = htons (buckets_sent);
616 ibf_write_slice (ibf, buckets_sent,
617 buckets_in_message, &msg[1]);
618 buckets_sent += buckets_in_message;
619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
620 buckets_in_message, buckets_sent, 1<<ibf_order);
621 GNUNET_MQ_send (op->mq, ev);
624 op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
629 * Send a strata estimator to the remote peer.
631 * @param op the union operation with the remote peer
634 send_strata_estimator (struct Operation *op)
636 struct GNUNET_MQ_Envelope *ev;
637 struct GNUNET_MessageHeader *strata_msg;
639 ev = GNUNET_MQ_msg_header_extra (strata_msg,
640 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
641 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
642 strata_estimator_write (op->state->se, &strata_msg[1]);
643 GNUNET_MQ_send (op->mq, ev);
644 op->state->phase = PHASE_EXPECT_IBF;
645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
650 * Compute the necessary order of an ibf
651 * from the size of the symmetric set difference.
653 * @param diff the difference
654 * @return the required size of the ibf
657 get_order_from_difference (unsigned int diff)
659 unsigned int ibf_order;
662 while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
664 if (ibf_order > MAX_IBF_ORDER)
665 ibf_order = MAX_IBF_ORDER;
671 * Handle a strata estimator from a remote peer
673 * @param cls the union operation
674 * @param mh the message
677 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
679 struct Operation *op = cls;
680 struct StrataEstimator *remote_se;
683 if (op->state->phase != PHASE_EXPECT_SE)
685 fail_union_operation (op);
689 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
691 strata_estimator_read (&mh[1], remote_se);
692 GNUNET_assert (NULL != op->state->se);
693 diff = strata_estimator_difference (remote_se, op->state->se);
694 strata_estimator_destroy (remote_se);
695 strata_estimator_destroy (op->state->se);
696 op->state->se = NULL;
697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
698 diff, 1<<get_order_from_difference (diff));
699 send_ibf (op, get_order_from_difference (diff));
705 * Iterator to send elements to a remote peer
707 * @param cls closure with the element key and the union operation
709 * @param value the key entry
712 send_element_iterator (void *cls,
716 struct SendElementClosure *sec = cls;
717 struct IBF_Key ibf_key = sec->ibf_key;
718 struct Operation *op = sec->op;
719 struct KeyEntry *ke = value;
721 if (ke->ibf_key.key_val != ibf_key.key_val)
725 const struct GNUNET_SET_Element *const element = &ke->element->element;
726 struct GNUNET_MQ_Envelope *ev;
727 struct GNUNET_MessageHeader *mh;
729 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
730 ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
733 /* element too large */
737 memcpy (&mh[1], element->data, element->size);
738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
739 GNUNET_h2s (&ke->element->element_hash));
740 GNUNET_MQ_send (op->mq, ev);
741 ke = ke->next_colliding;
747 * Send all elements that have the specified IBF key
748 * to the remote peer of the union operation
750 * @param op union operation
751 * @param ibf_key IBF key of interest
754 send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
756 struct SendElementClosure send_cls;
758 send_cls.ibf_key = ibf_key;
760 GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val,
761 &send_element_iterator, &send_cls);
766 * Decode which elements are missing on each side, and
767 * send the appropriate elemens and requests
769 * @param op union operation
772 decode_and_send (struct Operation *op)
775 struct IBF_Key last_key;
777 unsigned int num_decoded;
778 struct InvertibleBloomFilter *diff_ibf;
780 GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
782 prepare_ibf (op, op->state->remote_ibf->size);
783 diff_ibf = ibf_dup (op->state->local_ibf);
784 ibf_subtract (diff_ibf, op->state->remote_ibf);
786 ibf_destroy (op->state->remote_ibf);
787 op->state->remote_ibf = NULL;
789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
792 last_key.key_val = 0;
797 int cycle_detected = GNUNET_NO;
801 res = ibf_decode (diff_ibf, &side, &key);
802 if (res == GNUNET_OK)
804 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
807 if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
809 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
810 num_decoded, diff_ibf->size);
811 cycle_detected = GNUNET_YES;
814 if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
818 while (1<<next_order < diff_ibf->size)
821 if (next_order <= MAX_IBF_ORDER)
823 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
824 "decoding failed, sending larger ibf (size %u)\n",
826 send_ibf (op, next_order);
830 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
831 "set union failed: reached ibf limit\n");
835 if (GNUNET_NO == res)
837 struct GNUNET_MQ_Envelope *ev;
839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
840 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
841 GNUNET_MQ_send (op->mq, ev);
846 send_elements_for_key (op, key);
850 struct GNUNET_MQ_Envelope *ev;
851 struct GNUNET_MessageHeader *msg;
853 /* It may be nice to merge multiple requests, but with mesh's corking it is not worth
854 * the effort additional complexity. */
855 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
856 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
858 *(struct IBF_Key *) &msg[1] = key;
859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
860 GNUNET_MQ_send (op->mq, ev);
867 ibf_destroy (diff_ibf);
872 * Handle an IBF message from a remote peer.
874 * @param cls the union operation
875 * @param mh the header of the message
878 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
880 struct Operation *op = cls;
881 struct IBFMessage *msg = (struct IBFMessage *) mh;
882 unsigned int buckets_in_message;
884 if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
885 (op->state->phase == PHASE_EXPECT_IBF) )
887 op->state->phase = PHASE_EXPECT_IBF_CONT;
888 GNUNET_assert (NULL == op->state->remote_ibf);
889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
890 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
891 op->state->ibf_buckets_received = 0;
892 if (0 != ntohs (msg->offset))
895 fail_union_operation (op);
899 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
901 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
902 (1<<msg->order != op->state->remote_ibf->size) )
905 fail_union_operation (op);
910 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
912 if (0 == buckets_in_message)
915 fail_union_operation (op);
919 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
922 fail_union_operation (op);
926 ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
927 op->state->ibf_buckets_received += buckets_in_message;
929 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
931 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
932 op->state->phase = PHASE_EXPECT_ELEMENTS;
933 decode_and_send (op);
939 * Send a result message to the client indicating
940 * that there is a new element.
942 * @param op union operation
943 * @param element element to send
946 send_client_element (struct Operation *op,
947 struct GNUNET_SET_Element *element)
949 struct GNUNET_MQ_Envelope *ev;
950 struct GNUNET_SET_ResultMessage *rm;
952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
953 GNUNET_assert (0 != op->spec->client_request_id);
954 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
957 GNUNET_MQ_discard (ev);
961 rm->result_status = htons (GNUNET_SET_STATUS_OK);
962 rm->request_id = htonl (op->spec->client_request_id);
963 rm->element_type = element->type;
964 memcpy (&rm[1], element->data, element->size);
965 GNUNET_MQ_send (op->spec->set->client_mq, ev);
970 * Signal to the client that the operation has finished and
971 * destroy the operation.
973 * @param cls operation to destroy
976 send_done_and_destroy (void *cls)
978 struct Operation *op = cls;
979 struct GNUNET_MQ_Envelope *ev;
980 struct GNUNET_SET_ResultMessage *rm;
981 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
982 rm->request_id = htonl (op->spec->client_request_id);
983 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
984 rm->element_type = htons (0);
985 GNUNET_MQ_send (op->spec->set->client_mq, ev);
986 _GSS_operation_destroy (op);
991 * Send all remaining elements in the full result iterator.
993 * @param cls operation
996 send_remaining_elements (void *cls)
998 struct Operation *op = cls;
1002 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
1004 if (GNUNET_NO == res)
1006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
1007 send_done_and_destroy (op);
1011 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
1015 struct GNUNET_MQ_Envelope *ev;
1016 struct GNUNET_SET_ResultMessage *rm;
1017 struct GNUNET_SET_Element *element;
1018 element = &ke->element->element;
1020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
1021 GNUNET_assert (0 != op->spec->client_request_id);
1022 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1025 GNUNET_MQ_discard (ev);
1029 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1030 rm->request_id = htonl (op->spec->client_request_id);
1031 rm->element_type = element->type;
1032 memcpy (&rm[1], element->data, element->size);
1033 if (ke->next_colliding == NULL)
1035 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1036 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1039 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1040 ke = ke->next_colliding;
1046 * Send a result message to the client indicating
1047 * that the operation is over.
1048 * After the result done message has been sent to the client,
1049 * destroy the evaluate operation.
1051 * @param op union operation
1054 finish_and_destroy (struct Operation *op)
1056 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1058 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1061 GNUNET_assert (NULL == op->state->full_result_iter);
1062 op->state->full_result_iter =
1063 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1064 send_remaining_elements (op);
1067 send_done_and_destroy (op);
1072 * Handle an element message from a remote peer.
1074 * @param cls the union operation
1075 * @param mh the message
1078 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1080 struct Operation *op = cls;
1081 struct ElementEntry *ee;
1082 uint16_t element_size;
1084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1086 if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1087 (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1089 fail_union_operation (op);
1093 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1094 ee = GNUNET_malloc (sizeof *ee + element_size);
1095 memcpy (&ee[1], &mh[1], element_size);
1096 ee->element.size = element_size;
1097 ee->element.data = &ee[1];
1098 ee->remote = GNUNET_YES;
1099 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1101 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
1108 op_register_element (op, ee);
1109 /* only send results immediately if the client wants it */
1110 if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1111 send_client_element (op, &ee->element);
1116 * Handle an element request from a remote peer.
1118 * @param cls the union operation
1119 * @param mh the message
1122 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1124 struct Operation *op = cls;
1125 struct IBF_Key *ibf_key;
1126 unsigned int num_keys;
1128 /* look up elements and send them */
1129 if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1132 fail_union_operation (op);
1136 num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1138 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1141 fail_union_operation (op);
1145 ibf_key = (struct IBF_Key *) &mh[1];
1146 while (0 != num_keys--)
1148 send_elements_for_key (op, *ibf_key);
1155 * Handle a done message from a remote peer
1157 * @param cls the union operation
1158 * @param mh the message
1161 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1163 struct Operation *op = cls;
1164 struct GNUNET_MQ_Envelope *ev;
1166 if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1168 /* we got all requests, but still have to send our elements as response */
1170 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1171 op->state->phase = PHASE_FINISHED;
1172 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1173 GNUNET_MQ_send (op->mq, ev);
1176 if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1179 op->state->phase = PHASE_FINISHED;
1180 finish_and_destroy (op);
1184 fail_union_operation (op);
1189 * Evaluate a union operation with
1192 * @param op operation to evaluate
1195 union_evaluate (struct Operation *op)
1197 op->state = GNUNET_new (struct OperationState);
1198 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1199 /* we started the operation, thus we have to send the operation request */
1200 op->state->phase = PHASE_EXPECT_SE;
1201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation");
1202 send_operation_request (op);
1207 * Accept an union operation request from a remote peer.
1208 * Only initializes the private operation state.
1210 * @param op operation that will be accepted as a union operation
1213 union_accept (struct Operation *op)
1215 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1216 op->state = GNUNET_new (struct OperationState);
1217 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1218 /* kick off the operation */
1219 send_strata_estimator (op);
1224 * Create a new set supporting the union operation
1226 * @return the newly created set
1228 static struct SetState *
1229 union_set_create (void)
1231 struct SetState *set_state;
1233 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1235 set_state = GNUNET_new (struct SetState);
1236 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1237 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1243 * Add the element from the given element message to the set.
1245 * @param set_state state of the set want to add to
1246 * @param ee the element to add to the set
1249 union_add (struct SetState *set_state, struct ElementEntry *ee)
1251 strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1256 * Remove the element given in the element message from the set.
1257 * Only marks the element as removed, so that older set operations can still exchange it.
1259 * @param set_state state of the set to remove from
1260 * @param ee set element to remove
1263 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1265 strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0));
1270 * Destroy a set that supports the union operation.
1272 * @param set_state the set to destroy
1275 union_set_destroy (struct SetState *set_state)
1277 if (NULL != set_state->se)
1279 strata_estimator_destroy (set_state->se);
1280 set_state->se = NULL;
1282 GNUNET_free (set_state);
1287 * Dispatch messages for a union operation.
1289 * @param op the state of the union evaluate operation
1290 * @param mh the received message
1291 * @return GNUNET_SYSERR if the tunnel should be disconnected,
1292 * GNUNET_OK otherwise
1295 union_handle_p2p_message (struct Operation *op,
1296 const struct GNUNET_MessageHeader *mh)
1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1299 ntohs (mh->type), ntohs (mh->size));
1300 switch (ntohs (mh->type))
1302 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1303 handle_p2p_ibf (op, mh);
1305 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1306 handle_p2p_strata_estimator (op, mh);
1308 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1309 handle_p2p_elements (op, mh);
1311 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1312 handle_p2p_element_requests (op, mh);
1314 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1315 handle_p2p_done (op, mh);
1318 /* something wrong with mesh's message handlers? */
1326 union_peer_disconnect (struct Operation *op)
1328 if (PHASE_FINISHED != op->state->phase)
1330 struct GNUNET_MQ_Envelope *ev;
1331 struct GNUNET_SET_ResultMessage *msg;
1333 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1334 msg->request_id = htonl (op->spec->client_request_id);
1335 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1336 msg->element_type = htons (0);
1337 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1338 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1339 _GSS_operation_destroy (op);
1342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1343 if (GNUNET_NO == op->state->client_done_sent)
1344 finish_and_destroy (op);
1348 const struct SetVT *
1351 static const struct SetVT union_vt = {
1352 .create = &union_set_create,
1353 .msg_handler = &union_handle_p2p_message,
1355 .remove = &union_remove,
1356 .destroy_set = &union_set_destroy,
1357 .evaluate = &union_evaluate,
1358 .accept = &union_accept,
1359 .peer_disconnect = &union_peer_disconnect,
1360 .cancel = &union_op_cancel,