2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file set/gnunet-service-set.c
23 * @brief two-peer set operations
24 * @author Florian Dold
28 #include "gnunet-service-set.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_crypto_lib.h"
32 #include "strata_estimator.h"
33 #include "set_protocol.h"
38 * Number of IBFs in a strata estimator.
40 #define SE_STRATA_COUNT 32
42 * Size of the IBFs in the strata estimator.
44 #define SE_IBF_SIZE 80
46 * hash num parameter for the difference digests and strata estimators
48 #define SE_IBF_HASH_NUM 3
51 * Number of buckets that can be transmitted in one message.
53 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
56 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
57 * Choose this value so that computing the IBF is still cheaper
58 * than transmitting all values.
60 #define MAX_IBF_ORDER (16)
64 * Current phase we are in for a union operation
66 enum UnionOperationPhase
69 * We sent the request message, and expect a strata estimator
73 * We sent the strata estimator, and expect an IBF
77 * We know what type of IBF the other peer wants to send us,
78 * and expect the remaining parts
80 PHASE_EXPECT_IBF_CONT,
82 * We are sending request and elements,
83 * and thus only expect elements from the other peer.
85 PHASE_EXPECT_ELEMENTS,
87 * We are expecting elements and requests, and send
88 * requested elements back to the other peer.
90 PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
92 * The protocol is over.
93 * Results may still have to be sent to the client.
100 * State of an evaluate operation
103 struct UnionEvaluateOperation
106 * Local set the operation is evaluated on.
111 * Peer with the remote set
113 struct GNUNET_PeerIdentity peer;
116 * Application-specific identifier
118 struct GNUNET_HashCode app_id;
121 * Context message, given to us
122 * by the client, may be NULL.
124 struct GNUNET_MessageHeader *context_msg;
127 * Stream socket connected to the other peer
129 struct GNUNET_STREAM_Socket *socket;
132 * Message queue for the peer on the other
135 struct GNUNET_MQ_MessageQueue *mq;
138 * Type of this operation
140 enum GNUNET_SET_OperationType operation;
143 * Request ID to multiplex set operations to
144 * the client inhabiting the set.
149 * Number of ibf buckets received
151 unsigned int ibf_buckets_received;
154 * Copy of the set's strata estimator at the time of
155 * creation of this operation
157 struct StrataEstimator *se;
160 * The ibf we currently receive
162 struct InvertibleBloomFilter *remote_ibf;
165 * IBF of the set's element.
167 struct InvertibleBloomFilter *local_ibf;
170 * Maps IBF-Keys (specific to the current salt) to elements.
172 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
175 * Current state of the operation.
177 enum UnionOperationPhase phase;
180 * Salt to use for this operation.
185 * Generation in which the operation handle
188 unsigned int generation_created;
191 * Evaluate operations are held in
194 struct UnionEvaluateOperation *next;
197 * Evaluate operations are held in
200 struct UnionEvaluateOperation *prev;
205 * Information about the element in a set.
206 * All elements are stored in a hash-table
207 * from their hash-code to their 'struct Element',
208 * so that the remove and add operations are reasonably
214 * The actual element. The data for the element
215 * should be allocated at the end of this struct.
217 struct GNUNET_SET_Element element;
220 * Hash of the element.
221 * Will be used to derive the different IBF keys
222 * for different salts.
224 struct GNUNET_HashCode element_hash;
227 * Generation the element was added by the client.
228 * Operations of earlier generations will not consider the element.
230 unsigned int generation_added;
233 * GNUNET_YES if the element has been removed in some generation.
238 * Generation the element was removed by the client.
239 * Operations of later generations will not consider the element.
240 * Only valid if is_removed is GNUNET_YES.
242 unsigned int generation_removed;
245 * GNUNET_YES if the element is a remote element, and does not belong
246 * to the operation's set.
253 * Information about the element used for
254 * a specific union operation.
259 * IBF key for the entry, derived from the current salt.
261 struct IBF_Key ibf_key;
264 * The actual element associated with the key
266 struct ElementEntry *element;
269 * Element that collides with this element
272 struct KeyEntry *next_colliding;
276 * Used as a closure for sending elements
277 * with a specific IBF key.
279 struct SendElementClosure
282 * The IBF key whose matching elements should be
285 struct IBF_Key ibf_key;
288 * Operation for which the elements
291 struct UnionEvaluateOperation *eo;
296 * Extra state required for efficient set union.
301 * The strata estimator is only generated once for
303 * The IBF keys are derived from the element hashes with
306 struct StrataEstimator *se;
309 * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
311 struct GNUNET_CONTAINER_MultiHashMap *elements;
314 * Evaluate operations are held in
317 struct UnionEvaluateOperation *ops_head;
320 * Evaluate operations are held in
323 struct UnionEvaluateOperation *ops_tail;
326 * Current generation, that is, number of
327 * previously executed operations on this set
329 unsigned int current_generation;
336 * Destroy a union operation, and free all resources
337 * associated with it.
339 * @param eo the union operation to destroy
342 destroy_union_operation (struct UnionEvaluateOperation *eo)
344 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
345 eo->set->state.u->ops_tail,
348 /* FIXME: free and destroy everything else */
353 * Inform the client that the union operation has failed,
354 * and proceed to destroy the evaluate operation.
356 * @param eo the union operation to fail
359 fail_union_operation (struct UnionEvaluateOperation *eo)
361 struct GNUNET_MQ_Message *mqm;
362 struct ResultMessage *msg;
364 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
365 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
366 msg->request_id = eo->request_id;
367 GNUNET_MQ_send (eo->set->client_mq, mqm);
368 destroy_union_operation (eo);
373 * Derive the IBF key from a hash code and
376 * @param src the hash code
377 * @param salt salt to use
378 * @return the derived IBF key
380 static struct IBF_Key
381 get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
385 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
386 GCRY_MD_SHA512, GCRY_MD_SHA256,
388 &salt, sizeof (salt),
395 * Send a request for the evaluate operation to a remote peer
397 * @param eo operation with the other peer
400 send_operation_request (struct UnionEvaluateOperation *eo)
402 struct GNUNET_MQ_Message *mqm;
403 struct OperationRequestMessage *msg;
405 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
406 if (NULL != eo->context_msg)
407 if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
409 /* the context message is too large */
410 _GSS_client_disconnect (eo->set->client);
411 GNUNET_MQ_discard (mqm);
415 msg->operation = eo->operation;
416 msg->app_id = eo->app_id;
417 GNUNET_MQ_send (eo->mq, mqm);
419 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
424 * Iterator to create the mapping between ibf keys
425 * and element entries.
428 * @param key current key code
429 * @param value value in the hash map
430 * @return GNUNET_YES if we should continue to
435 insert_element_iterator (void *cls,
439 struct KeyEntry *const new_k = cls;
440 struct KeyEntry *old_k = value;
442 GNUNET_assert (NULL != old_k);
445 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
447 new_k->next_colliding = old_k;
448 old_k->next_colliding = new_k;
451 old_k = old_k->next_colliding;
452 } while (NULL != old_k);
458 insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
461 struct IBF_Key ibf_key;
464 ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
465 k = GNUNET_new (struct KeyEntry);
467 k->ibf_key = ibf_key;
468 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
469 (uint32_t) ibf_key.key_val,
470 insert_element_iterator, k);
471 /* was the element inserted into a colliding bucket? */
472 if (GNUNET_SYSERR == ret)
474 GNUNET_assert (NULL != k->next_colliding);
477 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
478 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
479 if (NULL != eo->local_ibf)
480 ibf_insert (eo->local_ibf, ibf_key);
485 prepare_ibf_iterator (void *cls,
489 struct InvertibleBloomFilter *ibf = cls;
490 struct KeyEntry *ke = value;
492 ibf_insert (ibf, ke->ibf_key);
498 init_key_to_element_iterator (void *cls,
499 const struct GNUNET_HashCode *key,
502 struct UnionEvaluateOperation *eo = cls;
503 struct ElementEntry *e = value;
505 /* make sure that the element belongs to the set at the time
506 * of creating the operation */
507 if ( (e->generation_added > eo->generation_created) ||
508 ( (GNUNET_YES == e->removed) &&
509 (e->generation_removed < eo->generation_created)))
512 insert_element (eo, e);
518 prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
520 if (NULL == eo->key_to_element)
523 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
524 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len);
525 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
526 init_key_to_element_iterator, eo);
528 if (NULL != eo->local_ibf)
529 ibf_destroy (eo->local_ibf);
530 eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
531 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
532 prepare_ibf_iterator, eo->local_ibf);
537 * Send an ibf of appropriate size.
539 * @param eo the union operation
540 * @param ibf_order order of the ibf to send, size=2^order
543 send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
545 unsigned int buckets_sent = 0;
546 struct InvertibleBloomFilter *ibf;
548 prepare_ibf (eo, 1<<ibf_order);
552 while (buckets_sent < (1 << ibf_order))
554 unsigned int buckets_in_message;
555 struct GNUNET_MQ_Message *mqm;
556 struct IBFMessage *msg;
558 buckets_in_message = (1 << ibf_order) - buckets_sent;
559 /* limit to maximum */
560 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
561 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
563 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
564 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
565 msg->order = htons (ibf_order);
566 msg->offset = htons (buckets_sent);
567 ibf_write_slice (ibf, buckets_sent,
568 buckets_in_message, &msg[1]);
569 buckets_sent += buckets_in_message;
570 GNUNET_MQ_send (eo->mq, mqm);
573 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
578 * Send a strata estimator to the remote peer.
580 * @param eo the union operation with the remote peer
583 send_strata_estimator (struct UnionEvaluateOperation *eo)
585 struct GNUNET_MQ_Message *mqm;
586 struct GNUNET_MessageHeader *strata_msg;
588 mqm = GNUNET_MQ_msg_header_extra (strata_msg,
589 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
590 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
591 strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
592 GNUNET_MQ_send (eo->mq, mqm);
593 eo->phase = PHASE_EXPECT_IBF;
597 get_order_from_difference (unsigned int diff)
599 unsigned int ibf_order;
602 while ((1<<ibf_order) < (2 * diff))
604 if (ibf_order > MAX_IBF_ORDER)
605 ibf_order = MAX_IBF_ORDER;
611 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
613 struct UnionEvaluateOperation *eo = cls;
614 struct StrataEstimator *remote_se;
617 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se\n");
619 if (eo->phase != PHASE_EXPECT_SE)
621 fail_union_operation (eo);
625 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
627 strata_estimator_read (&mh[1], remote_se);
628 GNUNET_assert (NULL != eo->se);
629 diff = strata_estimator_difference (remote_se, eo->se);
630 strata_estimator_destroy (remote_se);
631 strata_estimator_destroy (eo->se);
633 send_ibf (eo, get_order_from_difference (diff));
639 send_element_iterator (void *cls,
643 struct SendElementClosure *sec = cls;
644 struct IBF_Key ibf_key = sec->ibf_key;
645 struct UnionEvaluateOperation *eo = sec->eo;
646 struct KeyEntry *ke = value;
648 if (ke->ibf_key.key_val != ibf_key.key_val)
652 const struct GNUNET_SET_Element *const element = &ke->element->element;
653 struct GNUNET_MQ_Message *mqm;
655 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
656 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
657 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
660 GNUNET_MQ_discard (mqm);
663 GNUNET_MQ_send (eo->mq, mqm);
669 * Send all elements that have the specified IBF key
670 * to the remote peer of the union operation
672 * @param eo union operation
673 * @param ibf_key IBF key of interest
676 send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
678 struct SendElementClosure send_cls;
680 send_cls.ibf_key = ibf_key;
682 GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
683 send_element_iterator, &send_cls);
689 * Decode which elements are missing on each side, and
690 * send the appropriate elemens and requests
692 * @param eo union operation
695 decode_and_send (struct UnionEvaluateOperation *eo)
699 struct InvertibleBloomFilter *diff_ibf;
701 GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
703 prepare_ibf (eo, eo->remote_ibf->size);
704 diff_ibf = ibf_dup (eo->local_ibf);
705 ibf_subtract (diff_ibf, eo->remote_ibf);
711 res = ibf_decode (diff_ibf, &side, &key);
712 if (GNUNET_SYSERR == res)
714 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "decoding failed, sending larger ibf (size %u)\n",
716 send_ibf (eo, diff_ibf->size * 2);
717 ibf_destroy (diff_ibf);
720 if (GNUNET_NO == res)
722 struct GNUNET_MQ_Message *mqm;
724 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
725 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
726 GNUNET_MQ_send (eo->mq, mqm);
731 send_elements_for_key (eo, key);
735 struct GNUNET_MQ_Message *mqm;
736 struct GNUNET_MessageHeader *msg;
738 /* FIXME: before sending the request, check if we may just have the element */
739 /* FIXME: merge multiple requests */
740 mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
741 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
742 *(struct IBF_Key *) &msg[1] = key;
743 GNUNET_MQ_send (eo->mq, mqm);
750 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
752 struct UnionEvaluateOperation *eo = cls;
753 struct IBFMessage *msg = (struct IBFMessage *) mh;
754 unsigned int buckets_in_message;
756 if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
757 (eo->phase == PHASE_EXPECT_IBF) )
759 eo->phase = PHASE_EXPECT_IBF_CONT;
760 GNUNET_assert (NULL == eo->remote_ibf);
761 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
762 if (0 != ntohs (msg->offset))
765 fail_union_operation (eo);
768 else if (eo->phase == PHASE_EXPECT_IBF_CONT)
770 if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
771 (1<<msg->order != eo->remote_ibf->size) )
774 fail_union_operation (eo);
778 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
780 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
783 fail_union_operation (eo);
786 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
787 eo->ibf_buckets_received += buckets_in_message;
789 if (eo->ibf_buckets_received == eo->remote_ibf->size)
791 eo->phase = PHASE_EXPECT_ELEMENTS;
792 decode_and_send (eo);
798 * Send an element to the client of the operations's set.
800 * @param eo union operation
801 * @param element element to send
804 send_client_element (struct UnionEvaluateOperation *eo,
805 struct GNUNET_SET_Element *element)
807 struct GNUNET_MQ_Message *mqm;
808 struct ResultMessage *rm;
810 GNUNET_assert (0 != eo->request_id);
811 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
812 if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
814 GNUNET_MQ_discard (mqm);
819 GNUNET_MQ_send (eo->mq, mqm);
824 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
826 struct UnionEvaluateOperation *eo = cls;
827 struct ElementEntry *ee;
828 uint16_t element_size;
830 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
831 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
833 fail_union_operation (eo);
837 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
838 ee = GNUNET_malloc (sizeof *eo + element_size);
839 ee->element.data = &ee[1];
840 memcpy (ee->element.data, &mh[1], element_size);
841 ee->remote = GNUNET_YES;
843 insert_element (eo, ee);
844 send_client_element (eo, &ee->element);
849 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
851 struct UnionEvaluateOperation *eo = cls;
852 struct IBF_Key *ibf_key;
853 unsigned int num_keys;
855 /* look up elements and send them */
856 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
858 fail_union_operation (eo);
863 num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
865 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
867 fail_union_operation (eo);
872 ibf_key = (struct IBF_Key *) &mh[1];
873 while (0 != num_keys--)
875 send_elements_for_key (eo, *ibf_key);
882 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
884 struct UnionEvaluateOperation *eo = cls;
886 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
888 /* we got all requests, but still have to send our elements as response */
889 struct GNUNET_MQ_Message *mqm;
891 eo->phase = PHASE_FINISHED;
892 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
893 GNUNET_MQ_send (eo->mq, mqm);
896 if (eo->phase == PHASE_EXPECT_ELEMENTS)
899 eo->phase = PHASE_FINISHED;
903 fail_union_operation (eo);
908 * The handlers array, used for both evaluate and accept
910 static const struct GNUNET_MQ_Handler union_handlers[] = {
911 {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS},
912 {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE},
913 {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF},
914 {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS},
915 {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE},
916 GNUNET_MQ_HANDLERS_END
921 * Functions of this type will be called when a stream is established
923 * @param cls the closure from GNUNET_STREAM_open
924 * @param socket socket to use to communicate with the
925 * other side (read/write)
928 stream_open_cb (void *cls,
929 struct GNUNET_STREAM_Socket *socket)
931 struct UnionEvaluateOperation *eo = cls;
933 GNUNET_assert (NULL == eo->mq);
934 GNUNET_assert (socket == eo->socket);
936 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "open cb successful\n");
938 eo->mq = GNUNET_MQ_queue_for_stream_socket (eo->socket,
940 /* we started the operation, thus we have to send the operation request */
941 send_operation_request (eo);
942 eo->phase = PHASE_EXPECT_SE;
948 _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
950 struct UnionEvaluateOperation *eo;
952 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation\n");
954 eo = GNUNET_new (struct UnionEvaluateOperation);
958 GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET,
960 GNUNET_STREAM_OPTION_END);
965 _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
966 struct Incoming *incoming)
968 struct UnionEvaluateOperation *eo;
970 eo = GNUNET_new (struct UnionEvaluateOperation);
971 eo->generation_created = set->state.u->current_generation++;
973 eo->peer = incoming->peer;
974 eo->app_id = incoming->app_id;
975 eo->salt = ntohs (incoming->salt);
976 eo->request_id = m->request_id;
978 eo->mq = incoming->mq;
979 /* the peer's socket is now ours, we'll receive all messages */
980 GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo);
981 /* kick of the operation */
982 send_strata_estimator (eo);
987 _GSS_union_set_create (void)
991 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set created\n");
993 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
994 set->state.u = (struct UnionState *) &set[1];
995 set->operation = GNUNET_SET_OPERATION_UNION;
996 set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
997 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1003 _GSS_union_add (struct ElementMessage *m, struct Set *set)
1005 struct ElementEntry *ee;
1006 struct ElementEntry *ee_dup;
1007 uint16_t element_size;
1009 element_size = ntohs (m->header.size) - sizeof *m;
1010 ee = GNUNET_malloc (element_size + sizeof *ee);
1011 ee->element.size = element_size;
1012 ee->element.data = &ee[1];
1013 memcpy (ee->element.data, &m[1], element_size);
1014 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1015 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
1018 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1022 GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
1023 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1024 strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
1029 * Remove the element given in the element message from the set.
1030 * Only marks the element as removed, so that older set operations can still exchange it.
1032 * @param m message with the element
1033 * @param set set to remove the element from
1036 _GSS_union_remove (struct ElementMessage *m, struct Set *set)
1038 struct GNUNET_HashCode hash;
1039 struct ElementEntry *ee;
1041 GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1043 ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
1046 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1049 if (GNUNET_YES == ee->removed)
1051 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1054 ee->removed = GNUNET_YES;
1055 ee->generation_removed = set->state.u->current_generation;