2 This file is part of GNUnet
3 Copyright (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
35 * Number of IBFs in a strata estimator.
37 #define SE_STRATA_COUNT 32
39 * Size of the IBFs in the strata estimator.
41 #define SE_IBF_SIZE 80
43 * hash num parameter for the difference digests and strata estimators
45 #define SE_IBF_HASH_NUM 4
48 * Number of buckets that can be transmitted in one message.
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
53 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54 * Choose this value so that computing the IBF is still cheaper
55 * than transmitting all values.
57 #define MAX_IBF_ORDER (16)
60 * Number of buckets used in the ibf per estimated
67 * Current phase we are in for a union operation.
69 enum UnionOperationPhase
72 * We sent the request message, and expect a strata estimator
77 * We sent the strata estimator, and expect an IBF. This phase is entered once
78 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
80 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
85 * Continuation for multi part IBFs.
87 PHASE_EXPECT_IBF_CONT,
90 * We are sending request and elements,
91 * and thus only expect elements from the other peer.
93 * We are currently decoding an IBF until it can no longer be decoded,
94 * we currently send requests and expect elements
95 * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS
97 PHASE_EXPECT_ELEMENTS,
100 * We are expecting elements and requests, and send
101 * requested elements back to the other peer.
103 * We are in this phase if we have SENT an IBF for the remote peer to decode.
104 * We expect requests, send elements or could receive an new IBF, which takes
105 * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS
107 * The remote peer is thus in:
108 * #PHASE_EXPECT_ELEMENTS
110 PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
113 * The protocol is over.
114 * Results may still have to be sent to the client.
121 * State of an evaluate operation with another peer.
123 struct OperationState
127 * Copy of the set's strata estimator at the time of
128 * creation of this operation
130 struct StrataEstimator *se;
133 * The ibf we currently receive
135 struct InvertibleBloomFilter *remote_ibf;
138 * IBF of the set's element.
140 struct InvertibleBloomFilter *local_ibf;
143 * Maps IBF-Keys (specific to the current salt) to elements.
144 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
145 * Colliding IBF-Keys are linked.
147 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
150 * Iterator for sending elements on the key to element mapping to the client.
152 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
155 * Current state of the operation.
157 enum UnionOperationPhase phase;
160 * Did we send the client that we are done?
162 int client_done_sent;
165 * Number of ibf buckets received
167 unsigned int ibf_buckets_received;
173 * The key entry is used to associate an ibf key with an element.
178 * IBF key for the entry, derived from the current salt.
180 struct IBF_Key ibf_key;
183 * The actual element associated with the key.
185 struct ElementEntry *element;
188 * Element that collides with this element
189 * on the ibf key. All colliding entries must have the same ibf key.
191 struct KeyEntry *next_colliding;
196 * Used as a closure for sending elements
197 * with a specific IBF key.
199 struct SendElementClosure
202 * The IBF key whose matching elements should be
205 struct IBF_Key ibf_key;
208 * Operation for which the elements
211 struct Operation *op;
216 * Extra state required for efficient set union.
221 * The strata estimator is only generated once for
223 * The IBF keys are derived from the element hashes with
226 struct StrataEstimator *se;
231 * Iterator over hash map entries, called to
232 * destroy the linked list of colliding ibf key entries.
235 * @param key current key code
236 * @param value value in the hash map
237 * @return #GNUNET_YES if we should continue to iterate,
241 destroy_key_to_element_iter (void *cls,
245 struct KeyEntry *k = value;
249 struct KeyEntry *k_tmp = k;
251 k = k->next_colliding;
252 if (GNUNET_YES == k_tmp->element->remote)
254 GNUNET_free (k_tmp->element);
255 k_tmp->element = NULL;
264 * Destroy the union operation. Only things specific to the union
265 * operation are destroyed.
267 * @param op union operation to destroy
270 union_op_cancel (struct Operation *op)
272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
273 "destroying union op\n");
274 /* check if the op was canceled twice */
275 GNUNET_assert (NULL != op->state);
276 if (NULL != op->state->remote_ibf)
278 ibf_destroy (op->state->remote_ibf);
279 op->state->remote_ibf = NULL;
281 if (NULL != op->state->local_ibf)
283 ibf_destroy (op->state->local_ibf);
284 op->state->local_ibf = NULL;
286 if (NULL != op->state->se)
288 strata_estimator_destroy (op->state->se);
289 op->state->se = NULL;
291 if (NULL != op->state->key_to_element)
293 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
294 &destroy_key_to_element_iter,
296 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
297 op->state->key_to_element = NULL;
299 GNUNET_free (op->state);
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
302 "destroying union op done\n");
307 * Inform the client that the union operation has failed,
308 * and proceed to destroy the evaluate operation.
310 * @param op the union operation to fail
313 fail_union_operation (struct Operation *op)
315 struct GNUNET_MQ_Envelope *ev;
316 struct GNUNET_SET_ResultMessage *msg;
318 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
319 "union operation failed\n");
320 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
321 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
322 msg->request_id = htonl (op->spec->client_request_id);
323 msg->element_type = htons (0);
324 GNUNET_MQ_send (op->spec->set->client_mq, ev);
325 _GSS_operation_destroy (op, GNUNET_YES);
330 * Derive the IBF key from a hash code and
333 * @param src the hash code
334 * @param salt salt to use
335 * @return the derived IBF key
337 static struct IBF_Key
338 get_ibf_key (const struct GNUNET_HashCode *src,
343 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
344 GCRY_MD_SHA512, GCRY_MD_SHA256,
346 &salt, sizeof (salt),
353 * Iterator to create the mapping between ibf keys
354 * and element entries.
357 * @param key current key code
358 * @param value value in the hash map
359 * @return #GNUNET_YES if we should continue to iterate,
363 op_register_element_iterator (void *cls,
367 struct KeyEntry *const new_k = cls;
368 struct KeyEntry *old_k = value;
370 GNUNET_assert (NULL != old_k);
371 /* check if our ibf key collides with the ibf key in the existing entry */
372 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
374 /* insert the the new key in the collision chain */
375 new_k->next_colliding = old_k->next_colliding;
376 old_k->next_colliding = new_k;
377 /* signal to the caller that we were able to insert into a colliding bucket */
385 * Iterator to create the mapping between ibf keys
386 * and element entries.
389 * @param key current key code
390 * @param value value in the hash map
391 * @return #GNUNET_YES (we should continue to iterate)
394 op_has_element_iterator (void *cls,
398 struct GNUNET_HashCode *element_hash = cls;
399 struct KeyEntry *k = value;
401 GNUNET_assert (NULL != k);
404 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
407 k = k->next_colliding;
414 * Determine whether the given element is already in the operation's element
417 * @param op operation that should be tested for 'element_hash'
418 * @param element_hash hash of the element to look for
419 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
422 op_has_element (struct Operation *op,
423 const struct GNUNET_HashCode *element_hash)
426 struct IBF_Key ibf_key;
428 ibf_key = get_ibf_key (element_hash, op->spec->salt);
429 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
430 (uint32_t) ibf_key.key_val,
431 op_has_element_iterator,
432 (void *) element_hash);
434 /* was the iteration aborted because we found the element? */
435 if (GNUNET_SYSERR == ret)
442 * Insert an element into the union operation's
443 * key-to-element mapping. Takes ownership of 'ee'.
444 * Note that this does not insert the element in the set,
445 * only in the operation's key-element mapping.
446 * This is done to speed up re-tried operations, if some elements
447 * were transmitted, and then the IBF fails to decode.
449 * @param op the union operation
450 * @param ee the element entry
453 op_register_element (struct Operation *op,
454 struct ElementEntry *ee)
457 struct IBF_Key ibf_key;
460 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
461 k = GNUNET_new (struct KeyEntry);
463 k->ibf_key = ibf_key;
464 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
465 (uint32_t) ibf_key.key_val,
466 op_register_element_iterator,
469 /* was the element inserted into a colliding bucket? */
470 if (GNUNET_SYSERR == ret)
472 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
473 (uint32_t) ibf_key.key_val,
475 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
480 * Insert a key into an ibf.
484 * @param value the key entry to get the key from
487 prepare_ibf_iterator (void *cls,
491 struct InvertibleBloomFilter *ibf = cls;
492 struct KeyEntry *ke = value;
494 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
495 "inserting %x into ibf\n",
496 ke->ibf_key.key_val);
497 ibf_insert (ibf, ke->ibf_key);
503 * Iterator for initializing the
504 * key-to-element mapping of a union operation
506 * @param cls the union operation `struct Operation *`
508 * @param value the `struct ElementEntry *` to insert
509 * into the key-to-element mapping
510 * @return #GNUNET_YES (to continue iterating)
513 init_key_to_element_iterator (void *cls,
514 const struct GNUNET_HashCode *key,
517 struct Operation *op = cls;
518 struct ElementEntry *ee = value;
520 /* make sure that the element belongs to the set at the time
521 * of creating the operation */
522 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
525 GNUNET_assert (GNUNET_NO == ee->remote);
527 op_register_element (op, ee);
533 * Create an ibf with the operation's elements
534 * of the specified size
536 * @param op the union operation
537 * @param size size of the ibf to create
540 prepare_ibf (struct Operation *op,
543 if (NULL == op->state->key_to_element)
547 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
548 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
549 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
550 init_key_to_element_iterator, op);
552 if (NULL != op->state->local_ibf)
553 ibf_destroy (op->state->local_ibf);
554 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
555 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
556 &prepare_ibf_iterator,
557 op->state->local_ibf);
562 * Send an ibf of appropriate size.
564 * @param op the union operation
565 * @param ibf_order order of the ibf to send, size=2^order
568 send_ibf (struct Operation *op,
571 unsigned int buckets_sent = 0;
572 struct InvertibleBloomFilter *ibf;
574 prepare_ibf (op, 1<<ibf_order);
576 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
577 "sending ibf of size %u\n",
580 ibf = op->state->local_ibf;
582 while (buckets_sent < (1 << ibf_order))
584 unsigned int buckets_in_message;
585 struct GNUNET_MQ_Envelope *ev;
586 struct IBFMessage *msg;
588 buckets_in_message = (1 << ibf_order) - buckets_sent;
589 /* limit to maximum */
590 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
591 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
593 ev = GNUNET_MQ_msg_extra (msg,
594 buckets_in_message * IBF_BUCKET_SIZE,
595 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
597 msg->order = ibf_order;
598 msg->offset = htons (buckets_sent);
599 ibf_write_slice (ibf, buckets_sent,
600 buckets_in_message, &msg[1]);
601 buckets_sent += buckets_in_message;
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603 "ibf chunk size %u, %u/%u sent\n",
607 GNUNET_MQ_send (op->mq, ev);
610 op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
615 * Send a strata estimator to the remote peer.
617 * @param op the union operation with the remote peer
620 send_strata_estimator (struct Operation *op)
622 struct GNUNET_MQ_Envelope *ev;
623 struct GNUNET_MessageHeader *strata_msg;
625 ev = GNUNET_MQ_msg_header_extra (strata_msg,
626 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
627 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
628 strata_estimator_write (op->state->se, &strata_msg[1]);
629 GNUNET_MQ_send (op->mq,
631 op->state->phase = PHASE_EXPECT_IBF;
632 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633 "sent SE, expecting IBF\n");
638 * Compute the necessary order of an ibf
639 * from the size of the symmetric set difference.
641 * @param diff the difference
642 * @return the required size of the ibf
645 get_order_from_difference (unsigned int diff)
647 unsigned int ibf_order;
650 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
651 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
653 if (ibf_order > MAX_IBF_ORDER)
654 ibf_order = MAX_IBF_ORDER;
660 * Handle a strata estimator from a remote peer
662 * @param cls the union operation
663 * @param mh the message
664 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
665 * #GNUNET_OK otherwise
668 handle_p2p_strata_estimator (void *cls,
669 const struct GNUNET_MessageHeader *mh)
671 struct Operation *op = cls;
672 struct StrataEstimator *remote_se;
675 if (op->state->phase != PHASE_EXPECT_SE)
677 fail_union_operation (op);
679 return GNUNET_SYSERR;
681 if (ntohs (mh->size) !=
682 SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE +
683 sizeof (struct GNUNET_MessageHeader))
685 fail_union_operation (op);
687 return GNUNET_SYSERR;
689 remote_se = strata_estimator_create (SE_STRATA_COUNT,
692 strata_estimator_read (&mh[1], remote_se);
693 GNUNET_assert (NULL != op->state->se);
694 diff = strata_estimator_difference (remote_se,
696 strata_estimator_destroy (remote_se);
697 strata_estimator_destroy (op->state->se);
698 op->state->se = NULL;
699 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
700 "got se diff=%d, using ibf size %d\n",
702 1<<get_order_from_difference (diff));
704 get_order_from_difference (diff));
710 * Iterator to send elements to a remote peer
712 * @param cls closure with the element key and the union operation
714 * @param value the key entry
717 send_element_iterator (void *cls,
721 struct SendElementClosure *sec = cls;
722 struct IBF_Key ibf_key = sec->ibf_key;
723 struct Operation *op = sec->op;
724 struct KeyEntry *ke = value;
726 if (ke->ibf_key.key_val != ibf_key.key_val)
730 const struct GNUNET_SET_Element *const element = &ke->element->element;
731 struct GNUNET_MQ_Envelope *ev;
732 struct GNUNET_MessageHeader *mh;
734 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
735 ev = GNUNET_MQ_msg_header_extra (mh,
737 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
740 /* element too large */
747 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748 "sending element (%s) to peer\n",
749 GNUNET_h2s (&ke->element->element_hash));
750 GNUNET_MQ_send (op->mq, ev);
751 ke = ke->next_colliding;
758 * Send all elements that have the specified IBF key
759 * to the remote peer of the union operation
761 * @param op union operation
762 * @param ibf_key IBF key of interest
765 send_elements_for_key (struct Operation *op,
766 struct IBF_Key ibf_key)
768 struct SendElementClosure send_cls;
770 send_cls.ibf_key = ibf_key;
772 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
773 (uint32_t) ibf_key.key_val,
774 &send_element_iterator,
780 * Decode which elements are missing on each side, and
781 * send the appropriate elemens and requests
783 * @param op union operation
786 decode_and_send (struct Operation *op)
789 struct IBF_Key last_key;
791 unsigned int num_decoded;
792 struct InvertibleBloomFilter *diff_ibf;
794 GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
796 prepare_ibf (op, op->state->remote_ibf->size);
797 diff_ibf = ibf_dup (op->state->local_ibf);
798 ibf_subtract (diff_ibf, op->state->remote_ibf);
800 ibf_destroy (op->state->remote_ibf);
801 op->state->remote_ibf = NULL;
803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
804 "decoding IBF (size=%u)\n",
808 last_key.key_val = 0;
813 int cycle_detected = GNUNET_NO;
817 res = ibf_decode (diff_ibf, &side, &key);
818 if (res == GNUNET_OK)
820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821 "decoded ibf key %lx\n",
824 if ( (num_decoded > diff_ibf->size) ||
825 (num_decoded > 1 && last_key.key_val == key.key_val) )
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828 "detected cyclic ibf (decoded %u/%u)\n",
831 cycle_detected = GNUNET_YES;
834 if ( (GNUNET_SYSERR == res) ||
835 (GNUNET_YES == cycle_detected) )
839 while (1<<next_order < diff_ibf->size)
842 if (next_order <= MAX_IBF_ORDER)
844 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845 "decoding failed, sending larger ibf (size %u)\n",
847 send_ibf (op, next_order);
851 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
852 "set union failed: reached ibf limit\n");
856 if (GNUNET_NO == res)
858 struct GNUNET_MQ_Envelope *ev;
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861 "transmitted all values, sending DONE\n");
862 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
863 GNUNET_MQ_send (op->mq, ev);
868 send_elements_for_key (op, key);
872 struct GNUNET_MQ_Envelope *ev;
873 struct GNUNET_MessageHeader *msg;
875 /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
876 * the effort additional complexity. */
877 ev = GNUNET_MQ_msg_header_extra (msg,
878 sizeof (struct IBF_Key),
879 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
883 sizeof (struct IBF_Key));
884 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
885 "sending element request\n");
886 GNUNET_MQ_send (op->mq, ev);
893 ibf_destroy (diff_ibf);
898 * Handle an IBF message from a remote peer.
900 * @param cls the union operation
901 * @param mh the header of the message
902 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
903 * #GNUNET_OK otherwise
906 handle_p2p_ibf (void *cls,
907 const struct GNUNET_MessageHeader *mh)
909 struct Operation *op = cls;
910 const struct IBFMessage *msg;
911 unsigned int buckets_in_message;
913 if (ntohs (mh->size) < sizeof (struct IBFMessage))
916 fail_union_operation (op);
917 return GNUNET_SYSERR;
919 msg = (const struct IBFMessage *) mh;
920 if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
921 (op->state->phase == PHASE_EXPECT_IBF) )
923 op->state->phase = PHASE_EXPECT_IBF_CONT;
924 GNUNET_assert (NULL == op->state->remote_ibf);
925 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926 "Creating new ibf of size %u\n",
928 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
929 op->state->ibf_buckets_received = 0;
930 if (0 != ntohs (msg->offset))
933 fail_union_operation (op);
934 return GNUNET_SYSERR;
937 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
939 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
940 (1<<msg->order != op->state->remote_ibf->size) )
943 fail_union_operation (op);
944 return GNUNET_SYSERR;
948 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
950 if (0 == buckets_in_message)
953 fail_union_operation (op);
954 return GNUNET_SYSERR;
957 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
960 fail_union_operation (op);
961 return GNUNET_SYSERR;
964 ibf_read_slice (&msg[1],
965 op->state->ibf_buckets_received,
967 op->state->remote_ibf);
968 op->state->ibf_buckets_received += buckets_in_message;
970 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
972 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973 "received full ibf\n");
974 op->state->phase = PHASE_EXPECT_ELEMENTS;
975 decode_and_send (op);
982 * Send a result message to the client indicating
983 * that there is a new element.
985 * @param op union operation
986 * @param element element to send
989 send_client_element (struct Operation *op,
990 struct GNUNET_SET_Element *element)
992 struct GNUNET_MQ_Envelope *ev;
993 struct GNUNET_SET_ResultMessage *rm;
995 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
996 "sending element (size %u) to client\n",
998 GNUNET_assert (0 != op->spec->client_request_id);
999 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1002 GNUNET_MQ_discard (ev);
1007 if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1008 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1009 else if (GNUNET_SET_RESULT_SYMMETRIC == op->spec->result_mode)
1010 rm->result_status = htons (GNUNET_SET_STATUS_ADD_LOCAL);
1012 rm->request_id = htonl (op->spec->client_request_id);
1013 rm->element_type = element->element_type;
1014 memcpy (&rm[1], element->data, element->size);
1015 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1020 * Signal to the client that the operation has finished and
1021 * destroy the operation.
1023 * @param cls operation to destroy
1026 send_done_and_destroy (void *cls)
1028 struct Operation *op = cls;
1029 struct GNUNET_MQ_Envelope *ev;
1030 struct GNUNET_SET_ResultMessage *rm;
1032 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1033 rm->request_id = htonl (op->spec->client_request_id);
1034 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1035 rm->element_type = htons (0);
1036 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1037 _GSS_operation_destroy (op, GNUNET_YES);
1045 * Send all remaining elements in the full result iterator.
1047 * @param cls operation
1050 send_remaining_elements (void *cls)
1052 struct Operation *op = cls;
1053 struct KeyEntry *ke;
1056 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter,
1058 (const void **) &ke);
1059 if (GNUNET_NO == res)
1061 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062 "sending done and destroy because iterator ran out\n");
1063 send_done_and_destroy (op);
1066 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1067 "sending elements from key entry\n");
1070 struct GNUNET_MQ_Envelope *ev;
1071 struct GNUNET_SET_ResultMessage *rm;
1072 struct GNUNET_SET_Element *element;
1074 element = &ke->element->element;
1075 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1076 "sending element (size %u) to client (full set)\n",
1078 GNUNET_assert (0 != op->spec->client_request_id);
1079 ev = GNUNET_MQ_msg_extra (rm,
1081 GNUNET_MESSAGE_TYPE_SET_RESULT);
1084 GNUNET_MQ_discard (ev);
1088 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1089 rm->request_id = htonl (op->spec->client_request_id);
1090 rm->element_type = element->element_type;
1091 memcpy (&rm[1], element->data, element->size);
1092 if (NULL == ke->next_colliding)
1094 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1095 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1098 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1099 ke = ke->next_colliding;
1105 * Send a result message to the client indicating
1106 * that the operation is over.
1107 * After the result done message has been sent to the client,
1108 * destroy the evaluate operation.
1110 * @param op union operation
1113 finish_and_destroy (struct Operation *op)
1115 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1117 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1119 /* prevent that the op is free'd by the tunnel end handler */
1120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121 "sending full result set\n");
1122 GNUNET_assert (NULL == op->state->full_result_iter);
1123 op->state->full_result_iter =
1124 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1125 send_remaining_elements (op);
1128 send_done_and_destroy (op);
1133 * Handle an element message from a remote peer.
1135 * @param cls the union operation
1136 * @param mh the message
1139 handle_p2p_elements (void *cls,
1140 const struct GNUNET_MessageHeader *mh)
1142 struct Operation *op = cls;
1143 struct ElementEntry *ee;
1144 uint16_t element_size;
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "Got element from peer\n");
1148 if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1149 (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1151 fail_union_operation (op);
1152 GNUNET_break_op (0);
1155 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1156 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1157 memcpy (&ee[1], &mh[1], element_size);
1158 ee->element.size = element_size;
1159 ee->element.data = &ee[1];
1160 ee->remote = GNUNET_YES;
1161 GNUNET_CRYPTO_hash (ee->element.data,
1165 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1168 "got existing element from peer\n");
1173 op_register_element (op, ee);
1174 /* only send results immediately if the client wants it */
1175 if (GNUNET_SET_RESULT_FULL != op->spec->result_mode)
1176 send_client_element (op, &ee->element);
1181 * Handle an element request from a remote peer.
1183 * @param cls the union operation
1184 * @param mh the message
1187 handle_p2p_element_requests (void *cls,
1188 const struct GNUNET_MessageHeader *mh)
1190 struct Operation *op = cls;
1191 const struct IBF_Key *ibf_key;
1192 unsigned int num_keys;
1194 /* look up elements and send them */
1195 if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1197 GNUNET_break_op (0);
1198 fail_union_operation (op);
1201 num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1202 / sizeof (struct IBF_Key);
1203 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1204 != num_keys * sizeof (struct IBF_Key))
1206 GNUNET_break_op (0);
1207 fail_union_operation (op);
1211 ibf_key = (const struct IBF_Key *) &mh[1];
1212 while (0 != num_keys--)
1214 send_elements_for_key (op, *ibf_key);
1221 * Handle a done message from a remote peer
1223 * @param cls the union operation
1224 * @param mh the message
1227 handle_p2p_done (void *cls,
1228 const struct GNUNET_MessageHeader *mh)
1230 struct Operation *op = cls;
1231 struct GNUNET_MQ_Envelope *ev;
1233 if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1235 /* we got all requests, but still have to send our elements as response */
1237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1238 "got DONE, sending final DONE after elements\n");
1239 op->state->phase = PHASE_FINISHED;
1240 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1241 GNUNET_MQ_send (op->mq, ev);
1244 if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247 "got final DONE\n");
1248 op->state->phase = PHASE_FINISHED;
1249 finish_and_destroy (op);
1252 GNUNET_break_op (0);
1253 fail_union_operation (op);
1258 * Initiate operation to evaluate a set union with a remote peer.
1260 * @param op operation to perform (to be initialized)
1261 * @param opaque_context message to be transmitted to the listener
1262 * to convince him to accept, may be NULL
1265 union_evaluate (struct Operation *op,
1266 const struct GNUNET_MessageHeader *opaque_context)
1268 struct GNUNET_MQ_Envelope *ev;
1269 struct OperationRequestMessage *msg;
1271 op->state = GNUNET_new (struct OperationState);
1272 /* copy the current generation's strata estimator for this operation */
1273 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1274 /* we started the operation, thus we have to send the operation request */
1275 op->state->phase = PHASE_EXPECT_SE;
1276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277 "Initiating union operation evaluation\n");
1278 ev = GNUNET_MQ_msg_nested_mh (msg,
1279 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1283 /* the context message is too large */
1285 GNUNET_SERVER_client_disconnect (op->spec->set->client);
1288 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1289 msg->app_id = op->spec->app_id;
1290 GNUNET_MQ_send (op->mq,
1293 if (NULL != opaque_context)
1294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1295 "sent op request with context message\n");
1297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298 "sent op request without context message\n");
1303 * Accept an union operation request from a remote peer.
1304 * Only initializes the private operation state.
1306 * @param op operation that will be accepted as a union operation
1309 union_accept (struct Operation *op)
1311 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312 "accepting set union operation\n");
1313 op->state = GNUNET_new (struct OperationState);
1314 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1315 /* kick off the operation */
1316 send_strata_estimator (op);
1321 * Create a new set supporting the union operation
1323 * We maintain one strata estimator per set and then manipulate it over the
1324 * lifetime of the set, as recreating a strata estimator would be expensive.
1326 * @return the newly created set
1328 static struct SetState *
1329 union_set_create (void)
1331 struct SetState *set_state;
1333 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334 "union set created\n");
1335 set_state = GNUNET_new (struct SetState);
1336 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1337 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1343 * Add the element from the given element message to the set.
1345 * @param set_state state of the set want to add to
1346 * @param ee the element to add to the set
1349 union_add (struct SetState *set_state, struct ElementEntry *ee)
1351 strata_estimator_insert (set_state->se,
1352 get_ibf_key (&ee->element_hash, 0));
1357 * Remove the element given in the element message from the set.
1358 * Only marks the element as removed, so that older set operations can still exchange it.
1360 * @param set_state state of the set to remove from
1361 * @param ee set element to remove
1364 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1366 strata_estimator_remove (set_state->se,
1367 get_ibf_key (&ee->element_hash, 0));
1372 * Destroy a set that supports the union operation.
1374 * @param set_state the set to destroy
1377 union_set_destroy (struct SetState *set_state)
1379 if (NULL != set_state->se)
1381 strata_estimator_destroy (set_state->se);
1382 set_state->se = NULL;
1384 GNUNET_free (set_state);
1389 * Dispatch messages for a union operation.
1391 * @param op the state of the union evaluate operation
1392 * @param mh the received message
1393 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1394 * #GNUNET_OK otherwise
1397 union_handle_p2p_message (struct Operation *op,
1398 const struct GNUNET_MessageHeader *mh)
1400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401 "received p2p message (t: %u, s: %u)\n",
1404 switch (ntohs (mh->type))
1406 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1407 return handle_p2p_ibf (op, mh);
1408 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1409 return handle_p2p_strata_estimator (op, mh);
1410 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1411 handle_p2p_elements (op, mh);
1413 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1414 handle_p2p_element_requests (op, mh);
1416 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1417 handle_p2p_done (op, mh);
1420 /* something wrong with cadet's message handlers? */
1427 * handler for peer-disconnects, notifies the client
1428 * about the aborted operation in case the op was not concluded
1430 * @param op the destroyed operation
1433 union_peer_disconnect (struct Operation *op)
1435 if (PHASE_FINISHED != op->state->phase)
1437 struct GNUNET_MQ_Envelope *ev;
1438 struct GNUNET_SET_ResultMessage *msg;
1440 ev = GNUNET_MQ_msg (msg,
1441 GNUNET_MESSAGE_TYPE_SET_RESULT);
1442 msg->request_id = htonl (op->spec->client_request_id);
1443 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1444 msg->element_type = htons (0);
1445 GNUNET_MQ_send (op->spec->set->client_mq,
1447 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1448 "other peer disconnected prematurely\n");
1449 _GSS_operation_destroy (op,
1453 // else: the session has already been concluded
1454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455 "other peer disconnected (finished)\n");
1456 if (GNUNET_NO == op->state->client_done_sent)
1457 finish_and_destroy (op);
1460 static struct SetState *
1461 union_copy_state (struct Set *set)
1463 struct SetState *new_state;
1465 new_state = GNUNET_new (struct SetState);
1466 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1467 new_state->se = strata_estimator_dup (set->state->se);
1473 * Get the table with implementing functions for
1476 * @return the operation specific VTable
1478 const struct SetVT *
1481 static const struct SetVT union_vt = {
1482 .create = &union_set_create,
1483 .msg_handler = &union_handle_p2p_message,
1485 .remove = &union_remove,
1486 .destroy_set = &union_set_destroy,
1487 .evaluate = &union_evaluate,
1488 .accept = &union_accept,
1489 .peer_disconnect = &union_peer_disconnect,
1490 .cancel = &union_op_cancel,
1491 .copy_state = &union_copy_state,