2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set.c
23 * @brief two-peer set operations
24 * @author Florian Dold
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
38 * Number of IBFs in a strata estimator.
40 #define SE_STRATA_COUNT 32
42 * Size of the IBFs in the strata estimator.
44 #define SE_IBF_SIZE 80
46 * hash num parameter for the difference digests and strata estimators
48 #define SE_IBF_HASH_NUM 3
51 * Number of buckets that can be transmitted in one message.
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
56 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57 * Choose this value so that computing the IBF is still cheaper
58 * than transmitting all values.
60 #define MAX_IBF_ORDER (16)
64 * Current phase we are in for a union operation
66 enum UnionOperationPhase
69 * We sent the request message, and expect a strata estimator
73 * We sent the strata estimator, and expect an IBF
77 * We know what type of IBF the other peer wants to send us,
78 * and expect the remaining parts
80 PHASE_EXPECT_IBF_CONT,
82 * We are sending request and elements,
83 * and thus only expect elements from the other peer.
85 PHASE_EXPECT_ELEMENTS,
87 * We are expecting elements and requests, and send
88 * requested elements back to the other peer.
90 PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
92 * The protocol is over.
93 * Results may still have to be sent to the client.
100 * State of an evaluate operation
103 struct UnionEvaluateOperation
106 * Local set the operation is evaluated on.
111 * Peer with the remote set
113 struct GNUNET_PeerIdentity peer;
116 * Application-specific identifier
118 struct GNUNET_HashCode app_id;
121 * Context message, given to us
122 * by the client, may be NULL.
124 struct GNUNET_MessageHeader *context_msg;
127 * Stream socket connected to the other peer
129 struct GNUNET_STREAM_Socket *socket;
132 * Message queue for the peer on the other
135 struct GNUNET_MQ_MessageQueue *mq;
138 * Type of this operation
140 enum GNUNET_SET_OperationType operation;
143 * Request ID to multiplex set operations to
144 * the client inhabiting the set.
149 * Number of ibf buckets received
151 unsigned int ibf_buckets_received;
154 * Copy of the set's strata estimator at the time of
155 * creation of this operation
157 struct StrataEstimator *se;
160 * The ibf we currently receive
162 struct InvertibleBloomFilter *remote_ibf;
165 * IBF of the set's element.
167 struct InvertibleBloomFilter *local_ibf;
170 * Maps IBF-Keys (specific to the current salt) to elements.
172 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
175 * Current state of the operation.
177 enum UnionOperationPhase phase;
180 * Salt to use for this operation.
185 * Generation in which the operation handle
188 unsigned int generation_created;
191 * Evaluate operations are held in
194 struct UnionEvaluateOperation *next;
197 * Evaluate operations are held in
200 struct UnionEvaluateOperation *prev;
205 * Information about the element in a set.
206 * All elements are stored in a hash-table
207 * from their hash-code to their 'struct Element',
208 * so that the remove and add operations are reasonably
214 * The actual element. The data for the element
215 * should be allocated at the end of this struct.
217 struct GNUNET_SET_Element element;
220 * Hash of the element.
221 * Will be used to derive the different IBF keys
222 * for different salts.
224 struct GNUNET_HashCode element_hash;
227 * Generation the element was added by the client.
228 * Operations of earlier generations will not consider the element.
230 unsigned int generation_added;
233 * GNUNET_YES if the element has been removed in some generation.
238 * Generation the element was removed by the client.
239 * Operations of later generations will not consider the element.
240 * Only valid if is_removed is GNUNET_YES.
242 unsigned int generation_removed;
245 * GNUNET_YES if the element is a remote element, and does not belong
246 * to the operation's set.
253 * Information about the element used for
254 * a specific union operation.
259 * IBF key for the entry, derived from the current salt.
261 struct IBF_Key ibf_key;
264 * The actual element associated with the key
266 struct ElementEntry *element;
269 * Element that collides with this element
272 struct KeyEntry *next_colliding;
276 * Used as a closure for sending elements
277 * with a specific IBF key.
279 struct SendElementClosure
282 * The IBF key whose matching elements should be
285 struct IBF_Key ibf_key;
288 * Operation for which the elements
291 struct UnionEvaluateOperation *eo;
296 * Extra state required for efficient set union.
301 * The strata estimator is only generated once for
303 * The IBF keys are derived from the element hashes with
306 struct StrataEstimator *se;
309 * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
311 struct GNUNET_CONTAINER_MultiHashMap *elements;
314 * Evaluate operations are held in
317 struct UnionEvaluateOperation *ops_head;
320 * Evaluate operations are held in
323 struct UnionEvaluateOperation *ops_tail;
326 * Current generation, that is, number of
327 * previously executed operations on this set
329 unsigned int current_generation;
336 * Destroy a union operation, and free all resources
337 * associated with it.
339 * @param eo the union operation to destroy
342 destroy_union_operation (struct UnionEvaluateOperation *eo)
344 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
345 eo->set->state.u->ops_tail,
348 /* FIXME: free and destroy everything else */
353 * Inform the client that the union operation has failed,
354 * and proceed to destroy the evaluate operation.
356 * @param eo the union operation to fail
359 fail_union_operation (struct UnionEvaluateOperation *eo)
361 struct GNUNET_MQ_Message *mqm;
362 struct ResultMessage *msg;
364 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366 msg->request_id = eo->request_id;
367 GNUNET_MQ_send (eo->set->client_mq, mqm);
368 destroy_union_operation (eo);
373 * Derive the IBF key from a hash code and
376 * @param src the hash code
377 * @param salt salt to use
378 * @return the derived IBF key
380 static struct IBF_Key
381 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
385 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
386 GCRY_MD_SHA512, GCRY_MD_SHA256,
388 &salt, sizeof (salt),
395 * Send a request for the evaluate operation to a remote peer
397 * @param eo operation with the other peer
400 send_operation_request (struct UnionEvaluateOperation *eo)
402 struct GNUNET_MQ_Message *mqm;
403 struct OperationRequestMessage *msg;
405 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
406 if (NULL != eo->context_msg)
407 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
409 /* the context message is too large */
410 _GSS_client_disconnect (eo->set->client);
411 GNUNET_MQ_discard (mqm);
415 msg->operation = eo->operation;
416 msg->app_id = eo->app_id;
417 GNUNET_MQ_send (eo->mq, mqm);
419 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
424 * Iterator to create the mapping between ibf keys
425 * and element entries.
428 * @param key current key code
429 * @param value value in the hash map
430 * @return GNUNET_YES if we should continue to
435 insert_element_iterator (void *cls,
439 struct KeyEntry *const new_k = cls;
440 struct KeyEntry *old_k = value;
442 GNUNET_assert (NULL != old_k);
445 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
447 new_k->next_colliding = old_k;
448 old_k->next_colliding = new_k;
451 old_k = old_k->next_colliding;
452 } while (NULL != old_k);
458 * Insert an element into the union operation's
459 * key-to-element mapping
461 * @param the union operation
462 * @param ee the element entry
465 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
468 struct IBF_Key ibf_key;
471 ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
472 k = GNUNET_new (struct KeyEntry);
474 k->ibf_key = ibf_key;
475 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
476 (uint32_t) ibf_key.key_val,
477 insert_element_iterator, k);
478 /* was the element inserted into a colliding bucket? */
479 if (GNUNET_SYSERR == ret)
481 GNUNET_assert (NULL != k->next_colliding);
484 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
485 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
490 * Insert a key into an ibf.
494 * @param value the key entry to get the key from
497 prepare_ibf_iterator (void *cls,
501 struct InvertibleBloomFilter *ibf = cls;
502 struct KeyEntry *ke = value;
504 ibf_insert (ibf, ke->ibf_key);
510 * Iterator for initializing the
511 * key-to-element mapping of a union operation
513 * @param cls the union operation
515 * @param value the element entry to insert
516 * into the key-to-element mapping
519 init_key_to_element_iterator (void *cls,
520 const struct GNUNET_HashCode *key,
523 struct UnionEvaluateOperation *eo = cls;
524 struct ElementEntry *e = value;
526 /* make sure that the element belongs to the set at the time
527 * of creating the operation */
528 if ( (e->generation_added > eo->generation_created) ||
529 ( (GNUNET_YES == e->removed) &&
530 (e->generation_removed < eo->generation_created)))
533 insert_element (eo, e);
539 * Create an ibf with the operation's elements
540 * of the specified size
542 * @param eo the union operation
543 * @param size size of the ibf to create
546 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
548 if (NULL == eo->key_to_element)
551 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
552 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
553 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
554 init_key_to_element_iterator, eo);
556 if (NULL != eo->local_ibf)
557 ibf_destroy (eo->local_ibf);
558 eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
559 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
560 prepare_ibf_iterator, eo->local_ibf);
565 * Send an ibf of appropriate size.
567 * @param eo the union operation
568 * @param ibf_order order of the ibf to send, size=2^order
571 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
573 unsigned int buckets_sent = 0;
574 struct InvertibleBloomFilter *ibf;
576 prepare_ibf (eo, 1<<ibf_order);
580 while (buckets_sent < (1 << ibf_order))
582 unsigned int buckets_in_message;
583 struct GNUNET_MQ_Message *mqm;
584 struct IBFMessage *msg;
586 buckets_in_message = (1 << ibf_order) - buckets_sent;
587 /* limit to maximum */
588 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
589 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
591 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
592 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
593 msg->order = htons (ibf_order);
594 msg->offset = htons (buckets_sent);
595 ibf_write_slice (ibf, buckets_sent,
596 buckets_in_message, &msg[1]);
597 buckets_sent += buckets_in_message;
598 GNUNET_MQ_send (eo->mq, mqm);
601 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
606 * Send a strata estimator to the remote peer.
608 * @param eo the union operation with the remote peer
611 send_strata_estimator (struct UnionEvaluateOperation *eo)
613 struct GNUNET_MQ_Message *mqm;
614 struct GNUNET_MessageHeader *strata_msg;
616 mqm = GNUNET_MQ_msg_header_extra (strata_msg,
617 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
618 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
619 strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
620 GNUNET_MQ_send (eo->mq, mqm);
621 eo->phase = PHASE_EXPECT_IBF;
626 * Compute the necessary order of an ibf
627 * from the size of the symmetric set difference.
629 * @param diff the difference
630 * @return the required size of the ibf
633 get_order_from_difference (unsigned int diff)
635 unsigned int ibf_order;
638 while ((1<<ibf_order) < (2 * diff))
640 if (ibf_order > MAX_IBF_ORDER)
641 ibf_order = MAX_IBF_ORDER;
647 * Handle a strata estimator from a remote peer
649 * @param the union operation
650 * @param mh the message
653 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
655 struct UnionEvaluateOperation *eo = cls;
656 struct StrataEstimator *remote_se;
659 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
661 if (eo->phase != PHASE_EXPECT_SE)
663 fail_union_operation (eo);
667 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
669 strata_estimator_read (&mh[1], remote_se);
670 GNUNET_assert (NULL != eo->se);
671 diff = strata_estimator_difference (remote_se, eo->se);
672 strata_estimator_destroy (remote_se);
673 strata_estimator_destroy (eo->se);
675 send_ibf (eo, get_order_from_difference (diff));
681 * Iterator to send elements to a remote peer
683 * @param cls closure with the element key and the union operation
685 * @param value the key entry
688 send_element_iterator (void *cls,
692 struct SendElementClosure *sec = cls;
693 struct IBF_Key ibf_key = sec->ibf_key;
694 struct UnionEvaluateOperation *eo = sec->eo;
695 struct KeyEntry *ke = value;
697 if (ke->ibf_key.key_val != ibf_key.key_val)
701 const struct GNUNET_SET_Element *const element = &ke->element->element;
702 struct GNUNET_MQ_Message *mqm;
704 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
705 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
706 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
709 GNUNET_MQ_discard (mqm);
712 GNUNET_MQ_send (eo->mq, mqm);
718 * Send all elements that have the specified IBF key
719 * to the remote peer of the union operation
721 * @param eo union operation
722 * @param ibf_key IBF key of interest
725 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
727 struct SendElementClosure send_cls;
729 send_cls.ibf_key = ibf_key;
731 GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
732 send_element_iterator, &send_cls);
738 * Decode which elements are missing on each side, and
739 * send the appropriate elemens and requests
741 * @param eo union operation
744 decode_and_send (struct UnionEvaluateOperation *eo)
748 struct InvertibleBloomFilter *diff_ibf;
750 GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
752 prepare_ibf (eo, eo->remote_ibf->size);
753 diff_ibf = ibf_dup (eo->local_ibf);
754 ibf_subtract (diff_ibf, eo->remote_ibf);
760 res = ibf_decode (diff_ibf, &side, &key);
761 if (GNUNET_SYSERR == res)
763 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
765 send_ibf (eo, diff_ibf->size * 2);
766 ibf_destroy (diff_ibf);
769 if (GNUNET_NO == res)
771 struct GNUNET_MQ_Message *mqm;
773 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
774 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
775 GNUNET_MQ_send (eo->mq, mqm);
780 send_elements_for_key (eo, key);
784 struct GNUNET_MQ_Message *mqm;
785 struct GNUNET_MessageHeader *msg;
787 /* FIXME: before sending the request, check if we may just have the element */
788 /* FIXME: merge multiple requests */
789 mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
790 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
791 *(struct IBF_Key *) &msg[1] = key;
792 GNUNET_MQ_send (eo->mq, mqm);
799 * Handle an IBF message from a remote peer.
801 * @param cls the union operation
802 * @param mh the header of the message
805 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
807 struct UnionEvaluateOperation *eo = cls;
808 struct IBFMessage *msg = (struct IBFMessage *) mh;
809 unsigned int buckets_in_message;
811 if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
812 (eo->phase == PHASE_EXPECT_IBF) )
814 eo->phase = PHASE_EXPECT_IBF_CONT;
815 GNUNET_assert (NULL == eo->remote_ibf);
816 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
817 if (0 != ntohs (msg->offset))
820 fail_union_operation (eo);
823 else if (eo->phase == PHASE_EXPECT_IBF_CONT)
825 if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
826 (1<<msg->order != eo->remote_ibf->size) )
829 fail_union_operation (eo);
833 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
835 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
838 fail_union_operation (eo);
841 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
842 eo->ibf_buckets_received += buckets_in_message;
844 if (eo->ibf_buckets_received == eo->remote_ibf->size)
846 eo->phase = PHASE_EXPECT_ELEMENTS;
847 decode_and_send (eo);
853 * Send an element to the client of the operations's set.
855 * @param eo union operation
856 * @param element element to send
859 send_client_element (struct UnionEvaluateOperation *eo,
860 struct GNUNET_SET_Element *element)
862 struct GNUNET_MQ_Message *mqm;
863 struct ResultMessage *rm;
865 GNUNET_assert (0 != eo->request_id);
866 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
867 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
869 GNUNET_MQ_discard (mqm);
874 GNUNET_MQ_send (eo->mq, mqm);
879 * Handle an element message from a remote peer.
881 * @param cls the union operation
882 * @param mh the message
885 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
887 struct UnionEvaluateOperation *eo = cls;
888 struct ElementEntry *ee;
889 uint16_t element_size;
891 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
892 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
894 fail_union_operation (eo);
898 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
899 ee = GNUNET_malloc (sizeof *eo + element_size);
900 ee->element.data = &ee[1];
901 memcpy (ee->element.data, &mh[1], element_size);
902 ee->remote = GNUNET_YES;
904 insert_element (eo, ee);
905 send_client_element (eo, &ee->element);
910 * Handle an element request from a remote peer.
912 * @param cls the union operation
913 * @param mh the message
916 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
918 struct UnionEvaluateOperation *eo = cls;
919 struct IBF_Key *ibf_key;
920 unsigned int num_keys;
922 /* look up elements and send them */
923 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
925 fail_union_operation (eo);
930 num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
932 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
934 fail_union_operation (eo);
939 ibf_key = (struct IBF_Key *) &mh[1];
940 while (0 != num_keys--)
942 send_elements_for_key (eo, *ibf_key);
949 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
951 struct UnionEvaluateOperation *eo = cls;
953 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
955 /* we got all requests, but still have to send our elements as response */
956 struct GNUNET_MQ_Message *mqm;
958 eo->phase = PHASE_FINISHED;
959 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
960 GNUNET_MQ_send (eo->mq, mqm);
963 if (eo->phase == PHASE_EXPECT_ELEMENTS)
966 eo->phase = PHASE_FINISHED;
970 fail_union_operation (eo);
975 * The handlers array, used for both evaluate and accept
977 static const struct GNUNET_MQ_Handler union_handlers[] = {
978 {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
979 {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
980 {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
981 {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
982 {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
983 GNUNET_MQ_HANDLERS_END
988 * Functions of this type will be called when a stream is established
990 * @param cls the closure from GNUNET_STREAM_open
991 * @param socket socket to use to communicate with the
992 * other side (read/write)
995 stream_open_cb (void *cls,
996 struct GNUNET_STREAM_Socket *socket)
998 struct UnionEvaluateOperation *eo = cls;
1000 GNUNET_assert (NULL == eo->mq);
1001 GNUNET_assert (socket == eo->socket);
1003 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
1005 eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
1006 union_handlers, eo);
1007 /* we started the operation, thus we have to send the operation request */
1008 send_operation_request (eo);
1009 eo->phase = PHASE_EXPECT_SE;
1015 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
1017 struct UnionEvaluateOperation *eo;
1019 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
1021 eo = GNUNET_new (struct UnionEvaluateOperation);
1024 eo->request_id = htons(m->request_id);
1025 eo->se = strata_estimator_dup (set->state.u->se);
1026 eo->salt = ntohs (m->salt);
1027 eo->app_id = m->app_id;
1029 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
1031 GNUNET_STREAM_OPTION_END);
1036 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
1037 struct Incoming *incoming)
1039 struct UnionEvaluateOperation *eo;
1041 eo = GNUNET_new (struct UnionEvaluateOperation);
1042 eo->generation_created = set->state.u->current_generation++;
1044 eo->peer = incoming->peer;
1045 eo->salt = ntohs (incoming->salt);
1046 eo->request_id = m->request_id;
1047 eo->se = strata_estimator_dup (set->state.u->se);
1049 eo->mq = incoming->mq;
1050 /* the peer's socket is now ours, we'll receive all messages */
1051 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
1052 /* kick of the operation */
1053 send_strata_estimator (eo);
1058 _GSS_union_set_create (void)
1062 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
1064 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
1065 set->state.u = (struct UnionState *) &set[1];
1066 set->operation = GNUNET_SET_OPERATION_UNION;
1067 set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
1068 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1074 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1076 struct ElementEntry *ee;
1077 struct ElementEntry *ee_dup;
1078 uint16_t element_size;
1080 element_size = ntohs (m->header.size) - sizeof *m;
1081 ee = GNUNET_malloc (element_size + sizeof *ee);
1082 ee->element.size = element_size;
1083 ee->element.data = &ee[1];
1084 memcpy (ee->element.data, &m[1], element_size);
1085 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1086 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1089 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1093 GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1094 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1095 strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1100 * Remove the element given in the element message from the set.
1101 * Only marks the element as removed, so that older set operations can still exchange it.
1103 * @param m message with the element
1104 * @param set set to remove the element from
1107 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1109 struct GNUNET_HashCode hash;
1110 struct ElementEntry *ee;
1112 GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1114 ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1117 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1120 if (GNUNET_YES == ee->removed)
1122 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1125 ee->removed = GNUNET_YES;
1126 ee->generation_removed = set->state.u->current_generation;