2 This file is part of GNUnet
3 (C) 2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file set/gnunet-service-set_union.c
22 * @brief two-peer set operations
23 * @author Florian Dold
26 #include "gnunet_util_lib.h"
27 #include "gnunet-service-set.h"
29 #include "gnunet-service-set_union_strata_estimator.h"
30 #include "gnunet-service-set_protocol.h"
35 * Number of IBFs in a strata estimator.
37 #define SE_STRATA_COUNT 32
39 * Size of the IBFs in the strata estimator.
41 #define SE_IBF_SIZE 80
43 * hash num parameter for the difference digests and strata estimators
45 #define SE_IBF_HASH_NUM 4
48 * Number of buckets that can be transmitted in one message.
50 #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
53 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54 * Choose this value so that computing the IBF is still cheaper
55 * than transmitting all values.
57 #define MAX_IBF_ORDER (16)
60 * Number of buckets used in the ibf per estimated
67 * Current phase we are in for a union operation.
69 enum UnionOperationPhase
72 * We sent the request message, and expect a strata estimator
77 * We sent the strata estimator, and expect an IBF. This phase is entered once
78 * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
80 * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
85 * Continuation for multi part IBFs.
87 PHASE_EXPECT_IBF_CONT,
90 * We are sending request and elements,
91 * and thus only expect elements from the other peer.
93 * We are currently decoding an IBF until it can no longer be decoded,
94 * we currently send requests and expect elements
95 * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
97 PHASE_EXPECT_ELEMENTS,
100 * We are expecting elements and requests, and send
101 * requested elements back to the other peer.
103 * We are in this phase if we have SENT an IBF for the remote peer to decode.
104 * We expect requests, send elements or could receive an new IBF, which takes
105 * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
107 * The remote peer is thus in:
108 * PHASE_EXPECT_ELEMENTS
110 PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
113 * The protocol is over.
114 * Results may still have to be sent to the client.
121 * State of an evaluate operation with another peer.
123 struct OperationState
127 * Copy of the set's strata estimator at the time of
128 * creation of this operation
130 struct StrataEstimator *se;
133 * The ibf we currently receive
135 struct InvertibleBloomFilter *remote_ibf;
138 * IBF of the set's element.
140 struct InvertibleBloomFilter *local_ibf;
143 * Maps IBF-Keys (specific to the current salt) to elements.
144 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
145 * Colliding IBF-Keys are linked.
147 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
150 * Iterator for sending elements on the key to element mapping to the client.
152 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
155 * Current state of the operation.
157 enum UnionOperationPhase phase;
160 * Did we send the client that we are done?
162 int client_done_sent;
165 * Number of ibf buckets received
167 unsigned int ibf_buckets_received;
173 * The key entry is used to associate an ibf key with
179 * IBF key for the entry, derived from the current salt.
181 struct IBF_Key ibf_key;
184 * The actual element associated with the key.
186 struct ElementEntry *element;
189 * Element that collides with this element
190 * on the ibf key. All colliding entries must have the same ibf key.
192 struct KeyEntry *next_colliding;
197 * Used as a closure for sending elements
198 * with a specific IBF key.
200 struct SendElementClosure
203 * The IBF key whose matching elements should be
206 struct IBF_Key ibf_key;
209 * Operation for which the elements
212 struct Operation *op;
217 * Extra state required for efficient set union.
222 * The strata estimator is only generated once for
224 * The IBF keys are derived from the element hashes with
227 struct StrataEstimator *se;
232 * Iterator over hash map entries.
235 * @param key current key code
236 * @param value value in the hash map
237 * @return #GNUNET_YES if we should continue to
242 destroy_key_to_element_iter (void *cls,
246 struct KeyEntry *k = value;
247 /* destroy the linked list of colliding ibf key entries */
250 struct KeyEntry *k_tmp = k;
251 k = k->next_colliding;
252 if (GNUNET_YES == k_tmp->element->remote)
254 GNUNET_free (k_tmp->element);
255 k_tmp->element = NULL;
264 * Destroy the union operation. Only things specific to the union operation are destroyed.
266 * @param op union operation to destroy
269 union_op_cancel (struct Operation *op)
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
272 /* check if the op was canceled twice */
273 GNUNET_assert (NULL != op->state);
274 if (NULL != op->state->remote_ibf)
276 ibf_destroy (op->state->remote_ibf);
277 op->state->remote_ibf = NULL;
279 if (NULL != op->state->local_ibf)
281 ibf_destroy (op->state->local_ibf);
282 op->state->local_ibf = NULL;
284 if (NULL != op->state->se)
286 strata_estimator_destroy (op->state->se);
287 op->state->se = NULL;
289 if (NULL != op->state->key_to_element)
291 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
292 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
293 op->state->key_to_element = NULL;
295 GNUNET_free (op->state);
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
302 * Inform the client that the union operation has failed,
303 * and proceed to destroy the evaluate operation.
305 * @param op the union operation to fail
308 fail_union_operation (struct Operation *op)
310 struct GNUNET_MQ_Envelope *ev;
311 struct GNUNET_SET_ResultMessage *msg;
313 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
315 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
316 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
317 msg->request_id = htonl (op->spec->client_request_id);
318 msg->element_type = htons (0);
319 GNUNET_MQ_send (op->spec->set->client_mq, ev);
320 _GSS_operation_destroy (op, GNUNET_YES);
325 * Derive the IBF key from a hash code and
328 * @param src the hash code
329 * @param salt salt to use
330 * @return the derived IBF key
332 static struct IBF_Key
333 get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
337 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
338 GCRY_MD_SHA512, GCRY_MD_SHA256,
340 &salt, sizeof (salt),
347 * Iterator to create the mapping between ibf keys
348 * and element entries.
351 * @param key current key code
352 * @param value value in the hash map
353 * @return #GNUNET_YES if we should continue to
358 op_register_element_iterator (void *cls,
362 struct KeyEntry *const new_k = cls;
363 struct KeyEntry *old_k = value;
365 GNUNET_assert (NULL != old_k);
366 /* check if our ibf key collides with the ibf key in the existing entry */
367 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
369 /* insert the the new key in the collision chain */
370 new_k->next_colliding = old_k->next_colliding;
371 old_k->next_colliding = new_k;
372 /* signal to the caller that we were able to insert into a colliding bucket */
380 * Iterator to create the mapping between ibf keys
381 * and element entries.
384 * @param key current key code
385 * @param value value in the hash map
386 * @return #GNUNET_YES if we should continue to
391 op_has_element_iterator (void *cls,
395 struct GNUNET_HashCode *element_hash = cls;
396 struct KeyEntry *k = value;
398 GNUNET_assert (NULL != k);
401 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
403 k = k->next_colliding;
410 * Determine whether the given element is already in the operation's element
413 * @param op operation that should be tested for 'element_hash'
414 * @param element_hash hash of the element to look for
415 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
418 op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
421 struct IBF_Key ibf_key;
423 ibf_key = get_ibf_key (element_hash, op->spec->salt);
424 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
425 (uint32_t) ibf_key.key_val,
426 op_has_element_iterator, (void *) element_hash);
428 /* was the iteration aborted because we found the element? */
429 if (GNUNET_SYSERR == ret)
436 * Insert an element into the union operation's
437 * key-to-element mapping. Takes ownership of 'ee'.
438 * Note that this does not insert the element in the set,
439 * only in the operation's key-element mapping.
440 * This is done to speed up re-tried operations, if some elements
441 * were transmitted, and then the IBF fails to decode.
443 * @param op the union operation
444 * @param ee the element entry
447 op_register_element (struct Operation *op,
448 struct ElementEntry *ee)
451 struct IBF_Key ibf_key;
454 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
455 k = GNUNET_new (struct KeyEntry);
457 k->ibf_key = ibf_key;
458 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
459 (uint32_t) ibf_key.key_val,
460 op_register_element_iterator, k);
462 /* was the element inserted into a colliding bucket? */
463 if (GNUNET_SYSERR == ret)
466 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
467 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
472 * Insert a key into an ibf.
476 * @param value the key entry to get the key from
479 prepare_ibf_iterator (void *cls,
483 struct InvertibleBloomFilter *ibf = cls;
484 struct KeyEntry *ke = value;
486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
488 ibf_insert (ibf, ke->ibf_key);
494 * Iterator for initializing the
495 * key-to-element mapping of a union operation
497 * @param cls the union operation
499 * @param value the element entry to insert
500 * into the key-to-element mapping
501 * @return GNUNET_YES to continue iterating,
505 init_key_to_element_iterator (void *cls,
506 const struct GNUNET_HashCode *key,
509 struct Operation *op = cls;
510 struct ElementEntry *e = value;
512 /* make sure that the element belongs to the set at the time
513 * of creating the operation */
514 if ( (e->generation_added > op->generation_created) ||
515 ( (GNUNET_YES == e->removed) &&
516 (e->generation_removed < op->generation_created)))
519 GNUNET_assert (GNUNET_NO == e->remote);
521 op_register_element (op, e);
527 * Create an ibf with the operation's elements
528 * of the specified size
530 * @param op the union operation
531 * @param size size of the ibf to create
534 prepare_ibf (struct Operation *op, uint16_t size)
536 if (NULL == op->state->key_to_element)
539 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
540 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
541 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
542 init_key_to_element_iterator, op);
544 if (NULL != op->state->local_ibf)
545 ibf_destroy (op->state->local_ibf);
546 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
547 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
548 prepare_ibf_iterator, op->state->local_ibf);
553 * Send an ibf of appropriate size.
555 * @param op the union operation
556 * @param ibf_order order of the ibf to send, size=2^order
559 send_ibf (struct Operation *op, uint16_t ibf_order)
561 unsigned int buckets_sent = 0;
562 struct InvertibleBloomFilter *ibf;
564 prepare_ibf (op, 1<<ibf_order);
566 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
568 ibf = op->state->local_ibf;
570 while (buckets_sent < (1 << ibf_order))
572 unsigned int buckets_in_message;
573 struct GNUNET_MQ_Envelope *ev;
574 struct IBFMessage *msg;
576 buckets_in_message = (1 << ibf_order) - buckets_sent;
577 /* limit to maximum */
578 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
579 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
581 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
582 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
584 msg->order = ibf_order;
585 msg->offset = htons (buckets_sent);
586 ibf_write_slice (ibf, buckets_sent,
587 buckets_in_message, &msg[1]);
588 buckets_sent += buckets_in_message;
589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
590 buckets_in_message, buckets_sent, 1<<ibf_order);
591 GNUNET_MQ_send (op->mq, ev);
594 op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
599 * Send a strata estimator to the remote peer.
601 * @param op the union operation with the remote peer
604 send_strata_estimator (struct Operation *op)
606 struct GNUNET_MQ_Envelope *ev;
607 struct GNUNET_MessageHeader *strata_msg;
609 ev = GNUNET_MQ_msg_header_extra (strata_msg,
610 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
611 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
612 strata_estimator_write (op->state->se, &strata_msg[1]);
613 GNUNET_MQ_send (op->mq, ev);
614 op->state->phase = PHASE_EXPECT_IBF;
615 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
620 * Compute the necessary order of an ibf
621 * from the size of the symmetric set difference.
623 * @param diff the difference
624 * @return the required size of the ibf
627 get_order_from_difference (unsigned int diff)
629 unsigned int ibf_order;
632 while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
634 if (ibf_order > MAX_IBF_ORDER)
635 ibf_order = MAX_IBF_ORDER;
641 * Handle a strata estimator from a remote peer
643 * @param cls the union operation
644 * @param mh the message
647 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
649 struct Operation *op = cls;
650 struct StrataEstimator *remote_se;
653 if (op->state->phase != PHASE_EXPECT_SE)
655 fail_union_operation (op);
659 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
661 strata_estimator_read (&mh[1], remote_se);
662 GNUNET_assert (NULL != op->state->se);
663 diff = strata_estimator_difference (remote_se, op->state->se);
664 strata_estimator_destroy (remote_se);
665 strata_estimator_destroy (op->state->se);
666 op->state->se = NULL;
667 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
668 diff, 1<<get_order_from_difference (diff));
669 send_ibf (op, get_order_from_difference (diff));
675 * Iterator to send elements to a remote peer
677 * @param cls closure with the element key and the union operation
679 * @param value the key entry
682 send_element_iterator (void *cls,
686 struct SendElementClosure *sec = cls;
687 struct IBF_Key ibf_key = sec->ibf_key;
688 struct Operation *op = sec->op;
689 struct KeyEntry *ke = value;
691 if (ke->ibf_key.key_val != ibf_key.key_val)
695 const struct GNUNET_SET_Element *const element = &ke->element->element;
696 struct GNUNET_MQ_Envelope *ev;
697 struct GNUNET_MessageHeader *mh;
699 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
700 ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
703 /* element too large */
707 memcpy (&mh[1], element->data, element->size);
708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
709 GNUNET_h2s (&ke->element->element_hash));
710 GNUNET_MQ_send (op->mq, ev);
711 ke = ke->next_colliding;
717 * Send all elements that have the specified IBF key
718 * to the remote peer of the union operation
720 * @param op union operation
721 * @param ibf_key IBF key of interest
724 send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
726 struct SendElementClosure send_cls;
728 send_cls.ibf_key = ibf_key;
730 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
731 (uint32_t) ibf_key.key_val,
732 &send_element_iterator, &send_cls);
737 * Decode which elements are missing on each side, and
738 * send the appropriate elemens and requests
740 * @param op union operation
743 decode_and_send (struct Operation *op)
746 struct IBF_Key last_key;
748 unsigned int num_decoded;
749 struct InvertibleBloomFilter *diff_ibf;
751 GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
753 prepare_ibf (op, op->state->remote_ibf->size);
754 diff_ibf = ibf_dup (op->state->local_ibf);
755 ibf_subtract (diff_ibf, op->state->remote_ibf);
757 ibf_destroy (op->state->remote_ibf);
758 op->state->remote_ibf = NULL;
760 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
763 last_key.key_val = 0;
768 int cycle_detected = GNUNET_NO;
772 res = ibf_decode (diff_ibf, &side, &key);
773 if (res == GNUNET_OK)
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
778 if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
780 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
781 num_decoded, diff_ibf->size);
782 cycle_detected = GNUNET_YES;
785 if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
789 while (1<<next_order < diff_ibf->size)
792 if (next_order <= MAX_IBF_ORDER)
794 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
795 "decoding failed, sending larger ibf (size %u)\n",
797 send_ibf (op, next_order);
801 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
802 "set union failed: reached ibf limit\n");
806 if (GNUNET_NO == res)
808 struct GNUNET_MQ_Envelope *ev;
810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
811 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
812 GNUNET_MQ_send (op->mq, ev);
817 send_elements_for_key (op, key);
821 struct GNUNET_MQ_Envelope *ev;
822 struct GNUNET_MessageHeader *msg;
824 /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
825 * the effort additional complexity. */
826 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
827 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
829 *(struct IBF_Key *) &msg[1] = key;
830 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
831 GNUNET_MQ_send (op->mq, ev);
838 ibf_destroy (diff_ibf);
843 * Handle an IBF message from a remote peer.
845 * @param cls the union operation
846 * @param mh the header of the message
849 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
851 struct Operation *op = cls;
852 struct IBFMessage *msg = (struct IBFMessage *) mh;
853 unsigned int buckets_in_message;
855 if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
856 (op->state->phase == PHASE_EXPECT_IBF) )
858 op->state->phase = PHASE_EXPECT_IBF_CONT;
859 GNUNET_assert (NULL == op->state->remote_ibf);
860 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
861 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
862 op->state->ibf_buckets_received = 0;
863 if (0 != ntohs (msg->offset))
866 fail_union_operation (op);
870 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
872 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
873 (1<<msg->order != op->state->remote_ibf->size) )
876 fail_union_operation (op);
881 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
883 if (0 == buckets_in_message)
886 fail_union_operation (op);
890 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
893 fail_union_operation (op);
897 ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
898 op->state->ibf_buckets_received += buckets_in_message;
900 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
902 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
903 op->state->phase = PHASE_EXPECT_ELEMENTS;
904 decode_and_send (op);
910 * Send a result message to the client indicating
911 * that there is a new element.
913 * @param op union operation
914 * @param element element to send
917 send_client_element (struct Operation *op,
918 struct GNUNET_SET_Element *element)
920 struct GNUNET_MQ_Envelope *ev;
921 struct GNUNET_SET_ResultMessage *rm;
923 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
924 GNUNET_assert (0 != op->spec->client_request_id);
925 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
928 GNUNET_MQ_discard (ev);
932 rm->result_status = htons (GNUNET_SET_STATUS_OK);
933 rm->request_id = htonl (op->spec->client_request_id);
934 rm->element_type = element->element_type;
935 memcpy (&rm[1], element->data, element->size);
936 GNUNET_MQ_send (op->spec->set->client_mq, ev);
941 * Signal to the client that the operation has finished and
942 * destroy the operation.
944 * @param cls operation to destroy
947 send_done_and_destroy (void *cls)
949 struct Operation *op = cls;
950 struct GNUNET_MQ_Envelope *ev;
951 struct GNUNET_SET_ResultMessage *rm;
954 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
955 rm->request_id = htonl (op->spec->client_request_id);
956 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
957 rm->element_type = htons (0);
958 GNUNET_MQ_send (op->spec->set->client_mq, ev);
959 _GSS_operation_destroy (op, GNUNET_YES);
960 if (GNUNET_YES == keep)
966 * Send all remaining elements in the full result iterator.
968 * @param cls operation
971 send_remaining_elements (void *cls)
973 struct Operation *op = cls;
977 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
978 if (GNUNET_NO == res)
980 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
981 send_done_and_destroy (op);
985 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
989 struct GNUNET_MQ_Envelope *ev;
990 struct GNUNET_SET_ResultMessage *rm;
991 struct GNUNET_SET_Element *element;
992 element = &ke->element->element;
994 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
995 GNUNET_assert (0 != op->spec->client_request_id);
996 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
999 GNUNET_MQ_discard (ev);
1003 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1004 rm->request_id = htonl (op->spec->client_request_id);
1005 rm->element_type = element->element_type;
1006 memcpy (&rm[1], element->data, element->size);
1007 if (ke->next_colliding == NULL)
1009 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1010 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1013 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1014 ke = ke->next_colliding;
1020 * Send a result message to the client indicating
1021 * that the operation is over.
1022 * After the result done message has been sent to the client,
1023 * destroy the evaluate operation.
1025 * @param op union operation
1028 finish_and_destroy (struct Operation *op)
1030 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1032 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1034 /* prevent that the op is free'd by the tunnel end handler */
1035 op->keep = GNUNET_YES;
1036 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1037 GNUNET_assert (NULL == op->state->full_result_iter);
1038 op->state->full_result_iter =
1039 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1040 send_remaining_elements (op);
1043 send_done_and_destroy (op);
1048 * Handle an element message from a remote peer.
1050 * @param cls the union operation
1051 * @param mh the message
1054 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1056 struct Operation *op = cls;
1057 struct ElementEntry *ee;
1058 uint16_t element_size;
1060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061 "got element from peer\n");
1063 if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1064 (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1066 fail_union_operation (op);
1070 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1071 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1072 memcpy (&ee[1], &mh[1], element_size);
1073 ee->element.size = element_size;
1074 ee->element.data = &ee[1];
1075 ee->remote = GNUNET_YES;
1076 GNUNET_CRYPTO_hash (ee->element.data,
1080 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083 "got existing element from peer\n");
1088 op_register_element (op, ee);
1089 /* only send results immediately if the client wants it */
1090 if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1091 send_client_element (op, &ee->element);
1096 * Handle an element request from a remote peer.
1098 * @param cls the union operation
1099 * @param mh the message
1102 handle_p2p_element_requests (void *cls,
1103 const struct GNUNET_MessageHeader *mh)
1105 struct Operation *op = cls;
1106 struct IBF_Key *ibf_key;
1107 unsigned int num_keys;
1109 /* look up elements and send them */
1110 if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1113 fail_union_operation (op);
1117 num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
1119 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1122 fail_union_operation (op);
1126 ibf_key = (struct IBF_Key *) &mh[1];
1127 while (0 != num_keys--)
1129 send_elements_for_key (op, *ibf_key);
1136 * Handle a done message from a remote peer
1138 * @param cls the union operation
1139 * @param mh the message
1142 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1144 struct Operation *op = cls;
1145 struct GNUNET_MQ_Envelope *ev;
1147 if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1149 /* we got all requests, but still have to send our elements as response */
1151 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1152 op->state->phase = PHASE_FINISHED;
1153 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1154 GNUNET_MQ_send (op->mq, ev);
1157 if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1159 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1160 op->state->phase = PHASE_FINISHED;
1161 finish_and_destroy (op);
1165 fail_union_operation (op);
1170 * Initiate operation to evaluate a set union with a remote peer.
1172 * @param op operation to perform (to be initialized)
1173 * @param opaque_context message to be transmitted to the listener
1174 * to convince him to accept, may be NULL
1177 union_evaluate (struct Operation *op,
1178 const struct GNUNET_MessageHeader *opaque_context)
1180 struct GNUNET_MQ_Envelope *ev;
1181 struct OperationRequestMessage *msg;
1183 op->state = GNUNET_new (struct OperationState);
1184 /* copy the current generation's strata estimator for this operation */
1185 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1186 /* we started the operation, thus we have to send the operation request */
1187 op->state->phase = PHASE_EXPECT_SE;
1188 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189 "Initiating union operation evaluation\n");
1190 ev = GNUNET_MQ_msg_nested_mh (msg,
1191 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
1195 /* the context message is too large */
1197 GNUNET_SERVER_client_disconnect (op->spec->set->client);
1200 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
1201 msg->app_id = op->spec->app_id;
1202 msg->salt = htonl (op->spec->salt);
1203 GNUNET_MQ_send (op->mq, ev);
1205 if (NULL != opaque_context)
1206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207 "sent op request with context message\n");
1209 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1210 "sent op request without context message\n");
1215 * Accept an union operation request from a remote peer.
1216 * Only initializes the private operation state.
1218 * @param op operation that will be accepted as a union operation
1221 union_accept (struct Operation *op)
1223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1224 op->state = GNUNET_new (struct OperationState);
1225 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1226 /* kick off the operation */
1227 send_strata_estimator (op);
1232 * Create a new set supporting the union operation
1234 * We maintain one strata estimator per set and then manipulate it over the
1235 * lifetime of the set, as recreating a strata estimator would be expensive.
1237 * @return the newly created set
1239 static struct SetState *
1240 union_set_create (void)
1242 struct SetState *set_state;
1244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1246 set_state = GNUNET_new (struct SetState);
1247 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1248 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1254 * Add the element from the given element message to the set.
1256 * @param set_state state of the set want to add to
1257 * @param ee the element to add to the set
1260 union_add (struct SetState *set_state, struct ElementEntry *ee)
1262 strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1267 * Remove the element given in the element message from the set.
1268 * Only marks the element as removed, so that older set operations can still exchange it.
1270 * @param set_state state of the set to remove from
1271 * @param ee set element to remove
1274 union_remove (struct SetState *set_state, struct ElementEntry *ee)
1276 strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0));
1281 * Destroy a set that supports the union operation.
1283 * @param set_state the set to destroy
1286 union_set_destroy (struct SetState *set_state)
1288 if (NULL != set_state->se)
1290 strata_estimator_destroy (set_state->se);
1291 set_state->se = NULL;
1293 GNUNET_free (set_state);
1298 * Dispatch messages for a union operation.
1300 * @param op the state of the union evaluate operation
1301 * @param mh the received message
1302 * @return GNUNET_SYSERR if the tunnel should be disconnected,
1303 * GNUNET_OK otherwise
1306 union_handle_p2p_message (struct Operation *op,
1307 const struct GNUNET_MessageHeader *mh)
1309 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1310 ntohs (mh->type), ntohs (mh->size));
1311 switch (ntohs (mh->type))
1313 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1314 handle_p2p_ibf (op, mh);
1316 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1317 handle_p2p_strata_estimator (op, mh);
1319 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1320 handle_p2p_elements (op, mh);
1322 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1323 handle_p2p_element_requests (op, mh);
1325 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1326 handle_p2p_done (op, mh);
1329 /* something wrong with cadet's message handlers? */
1336 * handler for peer-disconnects, notifies the client
1337 * about the aborted operation in case the op was not concluded
1339 * @param op the destroyed operation
1342 union_peer_disconnect (struct Operation *op)
1344 if (PHASE_FINISHED != op->state->phase)
1346 struct GNUNET_MQ_Envelope *ev;
1347 struct GNUNET_SET_ResultMessage *msg;
1349 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1350 msg->request_id = htonl (op->spec->client_request_id);
1351 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1352 msg->element_type = htons (0);
1353 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1354 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1355 "other peer disconnected prematurely\n");
1356 _GSS_operation_destroy (op, GNUNET_YES);
1359 // else: the session has already been concluded
1360 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1361 if (GNUNET_NO == op->state->client_done_sent)
1362 finish_and_destroy (op);
1367 * Get the table with implementing functions for
1370 * @return the operation specific VTable
1372 const struct SetVT *
1375 static const struct SetVT union_vt = {
1376 .create = &union_set_create,
1377 .msg_handler = &union_handle_p2p_message,
1379 .remove = &union_remove,
1380 .destroy_set = &union_set_destroy,
1381 .evaluate = &union_evaluate,
1382 .accept = &union_accept,
1383 .peer_disconnect = &union_peer_disconnect,
1384 .cancel = &union_op_cancel,