2 This file is part of GNUnet
3 Copyright (C) 2013-2016 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file set/gnunet-service-set_union.c
23 * @brief two-peer set operations
24 * @author Florian Dold
27 #include "gnunet_util_lib.h"
28 #include "gnunet_statistics_service.h"
29 #include "gnunet-service-set.h"
31 #include "gnunet-service-set_union_strata_estimator.h"
32 #include "gnunet-service-set_protocol.h"
36 #define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
40 * Number of IBFs in a strata estimator.
42 #define SE_STRATA_COUNT 32
45 * Size of the IBFs in the strata estimator.
47 #define SE_IBF_SIZE 80
50 * The hash num parameter for the difference digests and strata estimators.
52 #define SE_IBF_HASH_NUM 4
55 * Number of buckets that can be transmitted in one message.
57 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
60 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
61 * Choose this value so that computing the IBF is still cheaper
62 * than transmitting all values.
64 #define MAX_IBF_ORDER (20)
67 * Number of buckets used in the ibf per estimated
74 * Current phase we are in for a union operation.
76 enum UnionOperationPhase
79 * We sent the request message, and expect a strata estimator.
84 * We sent the strata estimator, and expect an IBF. This phase is entered once
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
87 * XXX: could use better wording.
89 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
94 * Continuation for multi part IBFs.
96 PHASE_EXPECT_IBF_CONT,
99 * We are decoding an IBF.
101 PHASE_INVENTORY_ACTIVE,
104 * The other peer is decoding the IBF we just sent.
106 PHASE_INVENTORY_PASSIVE,
109 * The protocol is almost finished, but we still have to flush our message
110 * queue and/or expect some elements.
112 PHASE_FINISH_CLOSING,
115 * In the penultimate phase,
116 * we wait until all our demands
117 * are satisfied. Then we send a done
118 * message, and wait for another done message.*/
119 PHASE_FINISH_WAITING,
122 * In the ultimate phase, we wait until
123 * our demands are satisfied and then
124 * quit (sending another DONE message). */
130 * State of an evaluate operation with another peer.
132 struct OperationState
135 * Copy of the set's strata estimator at the time of
136 * creation of this operation.
138 struct StrataEstimator *se;
141 * The IBF we currently receive.
143 struct InvertibleBloomFilter *remote_ibf;
146 * The IBF with the local set's element.
148 struct InvertibleBloomFilter *local_ibf;
151 * Maps IBF-Keys (specific to the current salt) to elements.
152 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153 * Colliding IBF-Keys are linked.
155 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
158 * Current state of the operation.
160 enum UnionOperationPhase phase;
163 * Did we send the client that we are done?
165 int client_done_sent;
168 * Number of ibf buckets already received into the @a remote_ibf.
170 unsigned int ibf_buckets_received;
173 * Hashes for elements that we have demanded from the other peer.
175 struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
178 * Salt that we're using for sending IBFs
183 * Salt for the IBF we've received and that we're currently decoding.
185 uint32_t salt_receive;
188 * Number of elements we received from the other peer
189 * that were not in the local set yet.
191 uint32_t received_fresh;
194 * Total number of elements received from the other peer.
196 uint32_t received_total;
201 * The key entry is used to associate an ibf key with an element.
206 * IBF key for the entry, derived from the current salt.
208 struct IBF_Key ibf_key;
211 * The actual element associated with the key.
213 * Only owned by the union operation if element->operation
216 struct ElementEntry *element;
221 * Used as a closure for sending elements
222 * with a specific IBF key.
224 struct SendElementClosure
227 * The IBF key whose matching elements should be
230 struct IBF_Key ibf_key;
233 * Operation for which the elements
236 struct Operation *op;
241 * Extra state required for efficient set union.
246 * The strata estimator is only generated once for
248 * The IBF keys are derived from the element hashes with
251 struct StrataEstimator *se;
256 * Iterator over hash map entries, called to
257 * destroy the linked list of colliding ibf key entries.
260 * @param key current key code
261 * @param value value in the hash map
262 * @return #GNUNET_YES if we should continue to iterate,
266 destroy_key_to_element_iter (void *cls,
270 struct KeyEntry *k = value;
272 GNUNET_assert (NULL != k);
273 if (GNUNET_YES == k->element->remote)
275 GNUNET_free (k->element);
284 * Destroy the union operation. Only things specific to the union
285 * operation are destroyed.
287 * @param op union operation to destroy
290 union_op_cancel (struct Operation *op)
292 LOG (GNUNET_ERROR_TYPE_DEBUG,
293 "destroying union op\n");
294 /* check if the op was canceled twice */
295 GNUNET_assert (NULL != op->state);
296 if (NULL != op->state->remote_ibf)
298 ibf_destroy (op->state->remote_ibf);
299 op->state->remote_ibf = NULL;
301 if (NULL != op->state->demanded_hashes)
303 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
304 op->state->demanded_hashes = NULL;
306 if (NULL != op->state->local_ibf)
308 ibf_destroy (op->state->local_ibf);
309 op->state->local_ibf = NULL;
311 if (NULL != op->state->se)
313 strata_estimator_destroy (op->state->se);
314 op->state->se = NULL;
316 if (NULL != op->state->key_to_element)
318 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
319 &destroy_key_to_element_iter,
321 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
322 op->state->key_to_element = NULL;
324 GNUNET_free (op->state);
326 LOG (GNUNET_ERROR_TYPE_DEBUG,
327 "destroying union op done\n");
332 * Inform the client that the union operation has failed,
333 * and proceed to destroy the evaluate operation.
335 * @param op the union operation to fail
338 fail_union_operation (struct Operation *op)
340 struct GNUNET_MQ_Envelope *ev;
341 struct GNUNET_SET_ResultMessage *msg;
343 LOG (GNUNET_ERROR_TYPE_ERROR,
344 "union operation failed\n");
345 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
346 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
347 msg->request_id = htonl (op->spec->client_request_id);
348 msg->element_type = htons (0);
349 GNUNET_MQ_send (op->spec->set->client_mq, ev);
350 _GSS_operation_destroy (op, GNUNET_YES);
355 * Derive the IBF key from a hash code and
358 * @param src the hash code
359 * @return the derived IBF key
361 static struct IBF_Key
362 get_ibf_key (const struct GNUNET_HashCode *src)
367 GNUNET_CRYPTO_kdf (&key, sizeof (key),
369 &salt, sizeof (salt),
376 * Iterator over the mapping from IBF keys to element entries. Checks if we
377 * have an element with a given GNUNET_HashCode.
380 * @param key current key code
381 * @param value value in the hash map
382 * @return #GNUNET_YES if we should search further,
383 * #GNUNET_NO if we've found the element.
386 op_has_element_iterator (void *cls,
390 struct GNUNET_HashCode *element_hash = cls;
391 struct KeyEntry *k = value;
393 GNUNET_assert (NULL != k);
394 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
402 * Determine whether the given element is already in the operation's element
405 * @param op operation that should be tested for 'element_hash'
406 * @param element_hash hash of the element to look for
407 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
410 op_has_element (struct Operation *op,
411 const struct GNUNET_HashCode *element_hash)
414 struct IBF_Key ibf_key;
416 ibf_key = get_ibf_key (element_hash);
417 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
418 (uint32_t) ibf_key.key_val,
419 op_has_element_iterator,
420 (void *) element_hash);
422 /* was the iteration aborted because we found the element? */
423 if (GNUNET_SYSERR == ret)
430 * Insert an element into the union operation's
431 * key-to-element mapping. Takes ownership of 'ee'.
432 * Note that this does not insert the element in the set,
433 * only in the operation's key-element mapping.
434 * This is done to speed up re-tried operations, if some elements
435 * were transmitted, and then the IBF fails to decode.
437 * XXX: clarify ownership, doesn't sound right.
439 * @param op the union operation
440 * @param ee the element entry
443 op_register_element (struct Operation *op,
444 struct ElementEntry *ee)
446 struct IBF_Key ibf_key;
449 ibf_key = get_ibf_key (&ee->element_hash);
450 k = GNUNET_new (struct KeyEntry);
452 k->ibf_key = ibf_key;
453 GNUNET_assert (GNUNET_OK ==
454 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
455 (uint32_t) ibf_key.key_val,
457 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
462 salt_key (const struct IBF_Key *k_in,
464 struct IBF_Key *k_out)
467 uint64_t x = k_in->key_val;
469 x = (x >> s) | (x << (64 - s));
475 unsalt_key (const struct IBF_Key *k_in,
477 struct IBF_Key *k_out)
480 uint64_t x = k_in->key_val;
481 x = (x << s) | (x >> (64 - s));
487 * Insert a key into an ibf.
491 * @param value the key entry to get the key from
494 prepare_ibf_iterator (void *cls,
498 struct Operation *op = cls;
499 struct KeyEntry *ke = value;
500 struct IBF_Key salted_key;
502 LOG (GNUNET_ERROR_TYPE_DEBUG,
503 "[OP %x] inserting %lx (hash %s) into ibf\n",
505 (unsigned long) ke->ibf_key.key_val,
506 GNUNET_h2s (&ke->element->element_hash));
507 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
508 ibf_insert (op->state->local_ibf, salted_key);
514 * Iterator for initializing the
515 * key-to-element mapping of a union operation
517 * @param cls the union operation `struct Operation *`
519 * @param value the `struct ElementEntry *` to insert
520 * into the key-to-element mapping
521 * @return #GNUNET_YES (to continue iterating)
524 init_key_to_element_iterator (void *cls,
525 const struct GNUNET_HashCode *key,
528 struct Operation *op = cls;
529 struct ElementEntry *ee = value;
531 /* make sure that the element belongs to the set at the time
532 * of creating the operation */
533 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
536 GNUNET_assert (GNUNET_NO == ee->remote);
538 op_register_element (op, ee);
544 * Create an ibf with the operation's elements
545 * of the specified size
547 * @param op the union operation
548 * @param size size of the ibf to create
549 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
552 prepare_ibf (struct Operation *op,
555 if (NULL == op->state->key_to_element)
559 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
560 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
561 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
562 init_key_to_element_iterator, op);
564 if (NULL != op->state->local_ibf)
565 ibf_destroy (op->state->local_ibf);
566 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
567 if (NULL == op->state->local_ibf)
569 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
570 "Failed to allocate local IBF\n");
571 return GNUNET_SYSERR;
573 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
574 &prepare_ibf_iterator,
581 * Send an ibf of appropriate size.
583 * Fragments the IBF into multiple messages if necessary.
585 * @param op the union operation
586 * @param ibf_order order of the ibf to send, size=2^order
587 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
590 send_ibf (struct Operation *op,
593 unsigned int buckets_sent = 0;
594 struct InvertibleBloomFilter *ibf;
597 prepare_ibf (op, 1<<ibf_order))
599 /* allocation failed */
600 return GNUNET_SYSERR;
603 LOG (GNUNET_ERROR_TYPE_DEBUG,
604 "sending ibf of size %u\n",
608 char name[64] = { 0 };
609 snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
610 GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
613 ibf = op->state->local_ibf;
615 while (buckets_sent < (1 << ibf_order))
617 unsigned int buckets_in_message;
618 struct GNUNET_MQ_Envelope *ev;
619 struct IBFMessage *msg;
621 buckets_in_message = (1 << ibf_order) - buckets_sent;
622 /* limit to maximum */
623 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
624 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
626 ev = GNUNET_MQ_msg_extra (msg,
627 buckets_in_message * IBF_BUCKET_SIZE,
628 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
631 msg->order = ibf_order;
632 msg->offset = htonl (buckets_sent);
633 msg->salt = htonl (op->state->salt_send);
634 ibf_write_slice (ibf, buckets_sent,
635 buckets_in_message, &msg[1]);
636 buckets_sent += buckets_in_message;
637 LOG (GNUNET_ERROR_TYPE_DEBUG,
638 "ibf chunk size %u, %u/%u sent\n",
642 GNUNET_MQ_send (op->mq, ev);
645 /* The other peer must decode the IBF, so
647 op->state->phase = PHASE_INVENTORY_PASSIVE;
653 * Send a strata estimator to the remote peer.
655 * @param op the union operation with the remote peer
658 send_strata_estimator (struct Operation *op)
660 const struct StrataEstimator *se = op->state->se;
661 struct GNUNET_MQ_Envelope *ev;
662 struct GNUNET_MessageHeader *strata_msg;
667 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
668 len = strata_estimator_write (op->state->se,
670 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
671 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
673 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
674 ev = GNUNET_MQ_msg_header_extra (strata_msg,
677 GNUNET_memcpy (&strata_msg[1],
681 GNUNET_MQ_send (op->mq,
683 op->state->phase = PHASE_EXPECT_IBF;
684 LOG (GNUNET_ERROR_TYPE_DEBUG,
685 "sent SE, expecting IBF\n");
690 * Compute the necessary order of an ibf
691 * from the size of the symmetric set difference.
693 * @param diff the difference
694 * @return the required size of the ibf
697 get_order_from_difference (unsigned int diff)
699 unsigned int ibf_order;
702 while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
703 ((1<<ibf_order) < SE_IBF_HASH_NUM) )
705 if (ibf_order > MAX_IBF_ORDER)
706 ibf_order = MAX_IBF_ORDER;
712 * Handle a strata estimator from a remote peer
714 * @param cls the union operation
715 * @param mh the message
716 * @param is_compressed #GNUNET_YES if the estimator is compressed
717 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
718 * #GNUNET_OK otherwise
721 handle_p2p_strata_estimator (void *cls,
722 const struct GNUNET_MessageHeader *mh,
725 struct Operation *op = cls;
726 struct StrataEstimator *remote_se;
730 GNUNET_STATISTICS_update (_GSS_statistics,
731 "# bytes of SE received",
735 if (op->state->phase != PHASE_EXPECT_SE)
737 fail_union_operation (op);
739 return GNUNET_SYSERR;
741 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
742 if ( (GNUNET_NO == is_compressed) &&
743 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
745 fail_union_operation (op);
747 return GNUNET_SYSERR;
749 remote_se = strata_estimator_create (SE_STRATA_COUNT,
752 if (NULL == remote_se)
754 /* insufficient resources, fail */
755 fail_union_operation (op);
756 return GNUNET_SYSERR;
759 strata_estimator_read (&mh[1],
764 /* decompression failed */
765 fail_union_operation (op);
766 strata_estimator_destroy (remote_se);
767 return GNUNET_SYSERR;
769 GNUNET_assert (NULL != op->state->se);
770 diff = strata_estimator_difference (remote_se,
772 strata_estimator_destroy (remote_se);
773 strata_estimator_destroy (op->state->se);
774 op->state->se = NULL;
775 LOG (GNUNET_ERROR_TYPE_DEBUG,
776 "got se diff=%d, using ibf size %d\n",
778 1<<get_order_from_difference (diff));
781 get_order_from_difference (diff)))
783 /* Internal error, best we can do is shut the connection */
784 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
785 "Failed to send IBF, closing connection\n");
786 fail_union_operation (op);
787 return GNUNET_SYSERR;
794 * Iterator to send elements to a remote peer
796 * @param cls closure with the element key and the union operation
798 * @param value the key entry
801 send_offers_iterator (void *cls,
805 struct SendElementClosure *sec = cls;
806 struct Operation *op = sec->op;
807 struct KeyEntry *ke = value;
808 struct GNUNET_MQ_Envelope *ev;
809 struct GNUNET_MessageHeader *mh;
811 /* Detect 32-bit key collision for the 64-bit IBF keys. */
812 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
815 ev = GNUNET_MQ_msg_header_extra (mh,
816 sizeof (struct GNUNET_HashCode),
817 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
819 GNUNET_assert (NULL != ev);
820 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
821 LOG (GNUNET_ERROR_TYPE_DEBUG,
822 "[OP %x] sending element offer (%s) to peer\n",
824 GNUNET_h2s (&ke->element->element_hash));
825 GNUNET_MQ_send (op->mq, ev);
831 * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
833 * @param op union operation
834 * @param ibf_key IBF key of interest
837 send_offers_for_key (struct Operation *op,
838 struct IBF_Key ibf_key)
840 struct SendElementClosure send_cls;
842 send_cls.ibf_key = ibf_key;
844 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
845 (uint32_t) ibf_key.key_val,
846 &send_offers_iterator,
852 * Decode which elements are missing on each side, and
853 * send the appropriate offers and inquiries.
855 * @param op union operation
856 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
859 decode_and_send (struct Operation *op)
862 struct IBF_Key last_key;
864 unsigned int num_decoded;
865 struct InvertibleBloomFilter *diff_ibf;
867 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
870 prepare_ibf (op, op->state->remote_ibf->size))
873 /* allocation failed */
874 return GNUNET_SYSERR;
876 diff_ibf = ibf_dup (op->state->local_ibf);
877 ibf_subtract (diff_ibf, op->state->remote_ibf);
879 ibf_destroy (op->state->remote_ibf);
880 op->state->remote_ibf = NULL;
882 LOG (GNUNET_ERROR_TYPE_DEBUG,
883 "decoding IBF (size=%u)\n",
887 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
892 int cycle_detected = GNUNET_NO;
896 res = ibf_decode (diff_ibf, &side, &key);
897 if (res == GNUNET_OK)
899 LOG (GNUNET_ERROR_TYPE_DEBUG,
900 "decoded ibf key %lx\n",
901 (unsigned long) key.key_val);
903 if ( (num_decoded > diff_ibf->size) ||
904 ( (num_decoded > 1) &&
905 (last_key.key_val == key.key_val) ) )
907 LOG (GNUNET_ERROR_TYPE_DEBUG,
908 "detected cyclic ibf (decoded %u/%u)\n",
911 cycle_detected = GNUNET_YES;
914 if ( (GNUNET_SYSERR == res) ||
915 (GNUNET_YES == cycle_detected) )
919 while (1<<next_order < diff_ibf->size)
922 if (next_order <= MAX_IBF_ORDER)
924 LOG (GNUNET_ERROR_TYPE_DEBUG,
925 "decoding failed, sending larger ibf (size %u)\n",
927 GNUNET_STATISTICS_update (_GSS_statistics,
931 op->state->salt_send++;
933 send_ibf (op, next_order))
935 /* Internal error, best we can do is shut the connection */
936 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
937 "Failed to send IBF, closing connection\n");
938 fail_union_operation (op);
939 ibf_destroy (diff_ibf);
940 return GNUNET_SYSERR;
945 GNUNET_STATISTICS_update (_GSS_statistics,
946 "# of failed union operations (too large)",
949 // XXX: Send the whole set, element-by-element
950 LOG (GNUNET_ERROR_TYPE_ERROR,
951 "set union failed: reached ibf limit\n");
952 fail_union_operation (op);
953 ibf_destroy (diff_ibf);
954 return GNUNET_SYSERR;
958 if (GNUNET_NO == res)
960 struct GNUNET_MQ_Envelope *ev;
962 LOG (GNUNET_ERROR_TYPE_DEBUG,
963 "transmitted all values, sending DONE\n");
964 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
965 GNUNET_MQ_send (op->mq, ev);
966 /* We now wait until we get a DONE message back
967 * and then wait for our MQ to be flushed and all our
968 * demands be delivered. */
973 struct IBF_Key unsalted_key;
974 unsalt_key (&key, op->state->salt_receive, &unsalted_key);
975 send_offers_for_key (op, unsalted_key);
979 struct GNUNET_MQ_Envelope *ev;
980 struct InquiryMessage *msg;
982 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
983 * the effort additional complexity. */
984 ev = GNUNET_MQ_msg_extra (msg,
985 sizeof (struct IBF_Key),
986 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
987 msg->salt = htonl (op->state->salt_receive);
988 GNUNET_memcpy (&msg[1],
990 sizeof (struct IBF_Key));
991 LOG (GNUNET_ERROR_TYPE_DEBUG,
992 "sending element inquiry for IBF key %lx\n",
993 (unsigned long) key.key_val);
994 GNUNET_MQ_send (op->mq, ev);
1001 ibf_destroy (diff_ibf);
1007 * Handle an IBF message from a remote peer.
1009 * Reassemble the IBF from multiple pieces, and
1010 * process the whole IBF once possible.
1012 * @param cls the union operation
1013 * @param mh the header of the message
1014 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1015 * #GNUNET_OK otherwise
1018 handle_p2p_ibf (void *cls,
1019 const struct GNUNET_MessageHeader *mh)
1021 struct Operation *op = cls;
1022 const struct IBFMessage *msg;
1023 unsigned int buckets_in_message;
1025 if (ntohs (mh->size) < sizeof (struct IBFMessage))
1027 GNUNET_break_op (0);
1028 fail_union_operation (op);
1029 return GNUNET_SYSERR;
1031 msg = (const struct IBFMessage *) mh;
1032 if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1033 (op->state->phase == PHASE_EXPECT_IBF) )
1035 op->state->phase = PHASE_EXPECT_IBF_CONT;
1036 GNUNET_assert (NULL == op->state->remote_ibf);
1037 LOG (GNUNET_ERROR_TYPE_DEBUG,
1038 "Creating new ibf of size %u\n",
1040 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
1041 op->state->salt_receive = ntohl (msg->salt);
1042 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
1043 if (NULL == op->state->remote_ibf)
1045 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1046 "Failed to parse remote IBF, closing connection\n");
1047 fail_union_operation (op);
1048 return GNUNET_SYSERR;
1050 op->state->ibf_buckets_received = 0;
1051 if (0 != ntohl (msg->offset))
1053 GNUNET_break_op (0);
1054 fail_union_operation (op);
1055 return GNUNET_SYSERR;
1058 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1060 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1062 GNUNET_break_op (0);
1063 fail_union_operation (op);
1064 return GNUNET_SYSERR;
1066 if (1<<msg->order != op->state->remote_ibf->size)
1068 GNUNET_break_op (0);
1069 fail_union_operation (op);
1070 return GNUNET_SYSERR;
1072 if (ntohl (msg->salt) != op->state->salt_receive)
1074 GNUNET_break_op (0);
1075 fail_union_operation (op);
1076 return GNUNET_SYSERR;
1084 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1086 if (0 == buckets_in_message)
1088 GNUNET_break_op (0);
1089 fail_union_operation (op);
1090 return GNUNET_SYSERR;
1093 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1095 GNUNET_break_op (0);
1096 fail_union_operation (op);
1097 return GNUNET_SYSERR;
1100 GNUNET_assert (NULL != op->state->remote_ibf);
1102 ibf_read_slice (&msg[1],
1103 op->state->ibf_buckets_received,
1105 op->state->remote_ibf);
1106 op->state->ibf_buckets_received += buckets_in_message;
1108 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1110 LOG (GNUNET_ERROR_TYPE_DEBUG,
1111 "received full ibf\n");
1112 op->state->phase = PHASE_INVENTORY_ACTIVE;
1114 decode_and_send (op))
1116 /* Internal error, best we can do is shut down */
1117 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1118 "Failed to decode IBF, closing connection\n");
1119 return GNUNET_SYSERR;
1127 * Send a result message to the client indicating
1128 * that there is a new element.
1130 * @param op union operation
1131 * @param element element to send
1132 * @param status status to send with the new element
1135 send_client_element (struct Operation *op,
1136 struct GNUNET_SET_Element *element,
1139 struct GNUNET_MQ_Envelope *ev;
1140 struct GNUNET_SET_ResultMessage *rm;
1142 LOG (GNUNET_ERROR_TYPE_DEBUG,
1143 "sending element (size %u) to client\n",
1145 GNUNET_assert (0 != op->spec->client_request_id);
1146 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1149 GNUNET_MQ_discard (ev);
1153 rm->result_status = htons (status);
1154 rm->request_id = htonl (op->spec->client_request_id);
1155 rm->element_type = element->element_type;
1156 GNUNET_memcpy (&rm[1], element->data, element->size);
1157 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1162 * Signal to the client that the operation has finished and
1163 * destroy the operation.
1165 * @param cls operation to destroy
1168 send_done_and_destroy (void *cls)
1170 struct Operation *op = cls;
1171 struct GNUNET_MQ_Envelope *ev;
1172 struct GNUNET_SET_ResultMessage *rm;
1174 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
1175 rm->request_id = htonl (op->spec->client_request_id);
1176 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1177 rm->element_type = htons (0);
1178 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1179 /* Will also call the union-specific cancel function. */
1180 _GSS_operation_destroy (op, GNUNET_YES);
1185 maybe_finish (struct Operation *op)
1187 unsigned int num_demanded;
1189 num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
1191 if (PHASE_FINISH_WAITING == op->state->phase)
1193 LOG (GNUNET_ERROR_TYPE_DEBUG,
1194 "In PHASE_FINISH_WAITING, pending %u demands\n",
1196 if (0 == num_demanded)
1198 struct GNUNET_MQ_Envelope *ev;
1200 op->state->phase = PHASE_DONE;
1201 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1202 GNUNET_MQ_send (op->mq, ev);
1204 /* We now wait until the other peer closes the channel
1205 * after it got all elements from us. */
1208 if (PHASE_FINISH_CLOSING == op->state->phase)
1210 LOG (GNUNET_ERROR_TYPE_DEBUG,
1211 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1213 if (0 == num_demanded)
1215 op->state->phase = PHASE_DONE;
1216 send_done_and_destroy (op);
1223 * Handle an element message from a remote peer.
1224 * Sent by the other peer either because we decoded an IBF and placed a demand,
1225 * or because the other peer switched to full set transmission.
1227 * @param cls the union operation
1228 * @param mh the message
1231 handle_p2p_elements (void *cls,
1232 const struct GNUNET_MessageHeader *mh)
1234 struct Operation *op = cls;
1235 struct ElementEntry *ee;
1236 const struct GNUNET_SET_ElementMessage *emsg;
1237 uint16_t element_size;
1239 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1241 GNUNET_break_op (0);
1242 fail_union_operation (op);
1245 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1247 GNUNET_break_op (0);
1248 fail_union_operation (op);
1252 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1254 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1255 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1256 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1257 ee->element.size = element_size;
1258 ee->element.data = &ee[1];
1259 ee->element.element_type = ntohs (emsg->element_type);
1260 ee->remote = GNUNET_YES;
1261 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1264 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1268 /* We got something we didn't demand, since it's not in our map. */
1269 GNUNET_break_op (0);
1271 fail_union_operation (op);
1275 LOG (GNUNET_ERROR_TYPE_DEBUG,
1276 "Got element (size %u, hash %s) from peer\n",
1277 (unsigned int) element_size,
1278 GNUNET_h2s (&ee->element_hash));
1280 GNUNET_STATISTICS_update (_GSS_statistics,
1281 "# received elements",
1284 GNUNET_STATISTICS_update (_GSS_statistics,
1285 "# exchanged elements",
1289 op->state->received_total += 1;
1291 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1293 /* Got repeated element. Should not happen since
1294 * we track demands. */
1295 GNUNET_STATISTICS_update (_GSS_statistics,
1296 "# repeated elements",
1303 LOG (GNUNET_ERROR_TYPE_DEBUG,
1304 "Registering new element from remote peer\n");
1305 op->state->received_fresh += 1;
1306 op_register_element (op, ee);
1307 /* only send results immediately if the client wants it */
1308 switch (op->spec->result_mode)
1310 case GNUNET_SET_RESULT_ADDED:
1311 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1313 case GNUNET_SET_RESULT_SYMMETRIC:
1314 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1317 /* Result mode not supported, should have been caught earlier. */
1323 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1325 /* The other peer gave us lots of old elements, there's something wrong. */
1326 GNUNET_break_op (0);
1327 fail_union_operation (op);
1336 * Send offers (for GNUNET_Hash-es) in response
1337 * to inquiries (for IBF_Key-s).
1339 * @param cls the union operation
1340 * @param mh the message
1343 handle_p2p_inquiry (void *cls,
1344 const struct GNUNET_MessageHeader *mh)
1346 struct Operation *op = cls;
1347 const struct IBF_Key *ibf_key;
1348 unsigned int num_keys;
1349 struct InquiryMessage *msg;
1351 /* look up elements and send them */
1352 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1354 GNUNET_break_op (0);
1355 fail_union_operation (op);
1358 num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
1359 / sizeof (struct IBF_Key);
1360 if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
1361 != num_keys * sizeof (struct IBF_Key))
1363 GNUNET_break_op (0);
1364 fail_union_operation (op);
1368 msg = (struct InquiryMessage *) mh;
1370 ibf_key = (const struct IBF_Key *) &msg[1];
1371 while (0 != num_keys--)
1373 struct IBF_Key unsalted_key;
1374 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
1375 send_offers_for_key (op, unsalted_key);
1385 handle_p2p_demand (void *cls,
1386 const struct GNUNET_MessageHeader *mh)
1388 struct Operation *op = cls;
1389 struct ElementEntry *ee;
1390 struct GNUNET_SET_ElementMessage *emsg;
1391 const struct GNUNET_HashCode *hash;
1392 unsigned int num_hashes;
1393 struct GNUNET_MQ_Envelope *ev;
1395 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1396 / sizeof (struct GNUNET_HashCode);
1397 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1398 != num_hashes * sizeof (struct GNUNET_HashCode))
1400 GNUNET_break_op (0);
1401 fail_union_operation (op);
1405 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1407 hash++, num_hashes--)
1409 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
1412 /* Demand for non-existing element. */
1413 GNUNET_break_op (0);
1414 fail_union_operation (op);
1417 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
1419 /* Probably confused lazily copied sets. */
1420 GNUNET_break_op (0);
1421 fail_union_operation (op);
1424 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
1425 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1426 emsg->reserved = htons (0);
1427 emsg->element_type = htons (ee->element.element_type);
1428 LOG (GNUNET_ERROR_TYPE_DEBUG,
1429 "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1431 (unsigned int) ee->element.size,
1432 GNUNET_h2s (&ee->element_hash));
1433 GNUNET_MQ_send (op->mq, ev);
1434 GNUNET_STATISTICS_update (_GSS_statistics,
1435 "# exchanged elements",
1439 switch (op->spec->result_mode)
1441 case GNUNET_SET_RESULT_ADDED:
1442 /* Nothing to do. */
1444 case GNUNET_SET_RESULT_SYMMETRIC:
1445 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
1448 /* Result mode not supported, should have been caught earlier. */
1457 * Handle offers (of GNUNET_HashCode-s) and
1458 * respond with demands (of GNUNET_HashCode-s).
1460 * @param cls the union operation
1461 * @param mh the message
1464 handle_p2p_offer (void *cls,
1465 const struct GNUNET_MessageHeader *mh)
1467 struct Operation *op = cls;
1468 const struct GNUNET_HashCode *hash;
1469 unsigned int num_hashes;
1471 /* look up elements and send them */
1472 if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1473 (op->state->phase != PHASE_INVENTORY_ACTIVE))
1475 GNUNET_break_op (0);
1476 fail_union_operation (op);
1479 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1480 / sizeof (struct GNUNET_HashCode);
1481 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
1482 != num_hashes * sizeof (struct GNUNET_HashCode))
1484 GNUNET_break_op (0);
1485 fail_union_operation (op);
1489 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1491 hash++, num_hashes--)
1493 struct ElementEntry *ee;
1494 struct GNUNET_MessageHeader *demands;
1495 struct GNUNET_MQ_Envelope *ev;
1497 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1500 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1504 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1507 LOG (GNUNET_ERROR_TYPE_DEBUG,
1508 "Skipped sending duplicate demand\n");
1512 GNUNET_assert (GNUNET_OK ==
1513 GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
1516 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
1518 LOG (GNUNET_ERROR_TYPE_DEBUG,
1519 "[OP %x] Requesting element (hash %s)\n",
1520 (void *) op, GNUNET_h2s (hash));
1521 ev = GNUNET_MQ_msg_header_extra (demands,
1522 sizeof (struct GNUNET_HashCode),
1523 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1524 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1525 GNUNET_MQ_send (op->mq, ev);
1531 * Handle a done message from a remote peer
1533 * @param cls the union operation
1534 * @param mh the message
1537 handle_p2p_done (void *cls,
1538 const struct GNUNET_MessageHeader *mh)
1540 struct Operation *op = cls;
1542 if (op->state->phase == PHASE_INVENTORY_PASSIVE)
1544 /* We got all requests, but still have to send our elements in response. */
1546 op->state->phase = PHASE_FINISH_WAITING;
1548 LOG (GNUNET_ERROR_TYPE_DEBUG,
1549 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
1550 /* The active peer is done sending offers
1551 * and inquiries. This means that all
1552 * our responses to that (demands and offers)
1553 * must be in flight (queued or in mesh).
1555 * We should notify the active peer once
1556 * all our demands are satisfied, so that the active
1557 * peer can quit if we gave him everything.
1562 if (op->state->phase == PHASE_INVENTORY_ACTIVE)
1564 LOG (GNUNET_ERROR_TYPE_DEBUG,
1565 "got DONE (as active partner), waiting to finish\n");
1566 /* All demands of the other peer are satisfied,
1567 * and we processed all offers, thus we know
1568 * exactly what our demands must be.
1570 * We'll close the channel
1571 * to the other peer once our demands are met.
1573 op->state->phase = PHASE_FINISH_CLOSING;
1577 GNUNET_break_op (0);
1578 fail_union_operation (op);
1583 * Initiate operation to evaluate a set union with a remote peer.
1585 * @param op operation to perform (to be initialized)
1586 * @param opaque_context message to be transmitted to the listener
1587 * to convince him to accept, may be NULL
1590 union_evaluate (struct Operation *op,
1591 const struct GNUNET_MessageHeader *opaque_context)
1593 struct GNUNET_MQ_Envelope *ev;
1594 struct OperationRequestMessage *msg;
1596 GNUNET_assert (NULL == op->state);
1597 op->state = GNUNET_new (struct OperationState);
1598 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1599 /* copy the current generation's strata estimator for this operation */
1600 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1601 /* we started the operation, thus we have to send the operation request */
1602 op->state->phase = PHASE_EXPECT_SE;
1603 op->state->salt_receive = op->state->salt_send = 42;
1604 LOG (GNUNET_ERROR_TYPE_DEBUG,
1605 "Initiating union operation evaluation\n");
1606 GNUNET_STATISTICS_update (_GSS_statistics,
1607 "# of total union operations",
1610 GNUNET_STATISTICS_update (_GSS_statistics,
1611 "# of initiated union operations",
1614 ev = GNUNET_MQ_msg_nested_mh (msg,
1615 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1619 /* the context message is too large */
1621 GNUNET_SERVICE_client_drop (op->spec->set->client);
1624 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1625 GNUNET_MQ_send (op->mq,
1628 if (NULL != opaque_context)
1629 LOG (GNUNET_ERROR_TYPE_DEBUG,
1630 "sent op request with context message\n");
1632 LOG (GNUNET_ERROR_TYPE_DEBUG,
1633 "sent op request without context message\n");
1638 * Accept an union operation request from a remote peer.
1639 * Only initializes the private operation state.
1641 * @param op operation that will be accepted as a union operation
1644 union_accept (struct Operation *op)
1646 LOG (GNUNET_ERROR_TYPE_DEBUG,
1647 "accepting set union operation\n");
1648 GNUNET_assert (NULL == op->state);
1650 GNUNET_STATISTICS_update (_GSS_statistics,
1651 "# of accepted union operations",
1654 GNUNET_STATISTICS_update (_GSS_statistics,
1655 "# of total union operations",
1659 op->state = GNUNET_new (struct OperationState);
1660 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1661 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1662 op->state->salt_receive = op->state->salt_send = 42;
1663 /* kick off the operation */
1664 send_strata_estimator (op);
1669 * Create a new set supporting the union operation
1671 * We maintain one strata estimator per set and then manipulate it over the
1672 * lifetime of the set, as recreating a strata estimator would be expensive.
1674 * @return the newly created set, NULL on error
1676 static struct SetState *
1677 union_set_create (void)
1679 struct SetState *set_state;
1681 LOG (GNUNET_ERROR_TYPE_DEBUG,
1682 "union set created\n");
1683 set_state = GNUNET_new (struct SetState);
1684 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1685 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1686 if (NULL == set_state->se)
1688 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1689 "Failed to allocate strata estimator\n");
1690 GNUNET_free (set_state);
1698 * Add the element from the given element message to the set.
1700 * @param set_state state of the set want to add to
1701 * @param ee the element to add to the set
1704 union_add (struct SetState *set_state, struct ElementEntry *ee)
1706 strata_estimator_insert (set_state->se,
1707 get_ibf_key (&ee->element_hash));
1712 * Remove the element given in the element message from the set.
1713 * Only marks the element as removed, so that older set operations can still exchange it.
1715 * @param set_state state of the set to remove from
1716 * @param ee set element to remove
1719 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1721 strata_estimator_remove (set_state->se,
1722 get_ibf_key (&ee->element_hash));
1727 * Destroy a set that supports the union operation.
1729 * @param set_state the set to destroy
1732 union_set_destroy (struct SetState *set_state)
1734 if (NULL != set_state->se)
1736 strata_estimator_destroy (set_state->se);
1737 set_state->se = NULL;
1739 GNUNET_free (set_state);
1744 * Dispatch messages for a union operation.
1746 * @param op the state of the union evaluate operation
1747 * @param mh the received message
1748 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
1749 * #GNUNET_OK otherwise
1752 union_handle_p2p_message (struct Operation *op,
1753 const struct GNUNET_MessageHeader *mh)
1755 //LOG (GNUNET_ERROR_TYPE_DEBUG,
1756 // "received p2p message (t: %u, s: %u)\n",
1757 // ntohs (mh->type),
1758 // ntohs (mh->size));
1759 switch (ntohs (mh->type))
1761 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1762 return handle_p2p_ibf (op, mh);
1763 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1764 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1765 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1766 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1767 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1768 handle_p2p_elements (op, mh);
1770 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1771 handle_p2p_inquiry (op, mh);
1773 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
1774 handle_p2p_done (op, mh);
1776 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
1777 handle_p2p_offer (op, mh);
1779 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1780 handle_p2p_demand (op, mh);
1783 /* Something wrong with cadet's message handlers? */
1791 * Handler for peer-disconnects, notifies the client
1792 * about the aborted operation in case the op was not concluded.
1794 * @param op the destroyed operation
1797 union_peer_disconnect (struct Operation *op)
1799 if (PHASE_DONE != op->state->phase)
1801 struct GNUNET_MQ_Envelope *ev;
1802 struct GNUNET_SET_ResultMessage *msg;
1804 ev = GNUNET_MQ_msg (msg,
1805 GNUNET_MESSAGE_TYPE_SET_RESULT);
1806 msg->request_id = htonl (op->spec->client_request_id);
1807 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1808 msg->element_type = htons (0);
1809 GNUNET_MQ_send (op->spec->set->client_mq,
1811 LOG (GNUNET_ERROR_TYPE_WARNING,
1812 "other peer disconnected prematurely, phase %u\n",
1814 _GSS_operation_destroy (op,
1818 // else: the session has already been concluded
1819 LOG (GNUNET_ERROR_TYPE_DEBUG,
1820 "other peer disconnected (finished)\n");
1821 if (GNUNET_NO == op->state->client_done_sent)
1822 send_done_and_destroy (op);
1827 * Copy union-specific set state.
1829 * @param set source set for copying the union state
1830 * @return a copy of the union-specific set state
1832 static struct SetState *
1833 union_copy_state (struct Set *set)
1835 struct SetState *new_state;
1837 new_state = GNUNET_new (struct SetState);
1838 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
1839 new_state->se = strata_estimator_dup (set->state->se);
1846 * Get the table with implementing functions for
1849 * @return the operation specific VTable
1851 const struct SetVT *
1854 static const struct SetVT union_vt = {
1855 .create = &union_set_create,
1856 .msg_handler = &union_handle_p2p_message,
1858 .remove = &union_remove,
1859 .destroy_set = &union_set_destroy,
1860 .evaluate = &union_evaluate,
1861 .accept = &union_accept,
1862 .peer_disconnect = &union_peer_disconnect,
1863 .cancel = &union_op_cancel,
1864 .copy_state = &union_copy_state,