indentation fixes
[oweals/gnunet.git] / src / set / gnunet-service-set_intersection.c
index d654f33e8f10d9245824a5107bfc36430d6eb2ec..258ad64436494fd02d5beca0007026f9c9b12ac1 100644 (file)
@@ -1,10 +1,10 @@
 /*
       This file is part of GNUnet
 /*
       This file is part of GNUnet
-      (C) 2013 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013, 2014 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
+      by the Free Software Foundation; either version 3, or (at your
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
 
       You should have received a copy of the GNU General Public License
       along with GNUnet; see the file COPYING.  If not, write to the
 
       You should have received a copy of the GNU General Public License
       along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
 */
 */
-
 /**
  * @file set/gnunet-service-set_intersection.c
  * @brief two-peer set intersection
 /**
  * @file set/gnunet-service-set_intersection.c
  * @brief two-peer set intersection
- * @author Christian M. Fuchs
+ * @author Christian Fuchs
+ * @author Christian Grothoff
  */
  */
-
-
+#include "platform.h"
+#include "gnunet_util_lib.h"
 #include "gnunet-service-set.h"
 #include "gnunet-service-set.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_crypto_lib.h"
-#include "ibf.h"
-#include "strata_estimator.h"
-#include "set_protocol.h"
+#include "gnunet_block_lib.h"
+#include "gnunet-service-set_protocol.h"
 #include <gcrypt.h>
 
 
 /**
 #include <gcrypt.h>
 
 
 /**
- * Number of IBFs in a strata estimator.
- */
-#define SE_STRATA_COUNT 32
-/**
- * Size of the IBFs in the strata estimator.
- */
-#define SE_IBF_SIZE 80
-/**
- * hash num parameter for the difference digests and strata estimators
- */
-#define SE_IBF_HASH_NUM 3
-
-/**
- * Number of buckets that can be transmitted in one message.
- */
-#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
-
-/**
- * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
- * Choose this value so that computing the IBF is still cheaper
- * than transmitting all values.
- */
-#define MAX_IBF_ORDER (16)
-
-
-/**
- * Current phase we are in for a union operation
+ * Current phase we are in for a intersection operation.
  */
 enum IntersectionOperationPhase
 {
   /**
  */
 enum IntersectionOperationPhase
 {
   /**
-   * We sent the request message, and expect a strata estimator
-   */
-  PHASE_EXPECT_SE,
-  /**
-   * We sent the strata estimator, and expect an IBF
-   */
-  PHASE_EXPECT_IBF,
-  /**
-   * We know what type of IBF the other peer wants to send us,
-   * and expect the remaining parts
+   * We are just starting.
    */
    */
-  PHASE_EXPECT_IBF_CONT,
+  PHASE_INITIAL,
+
   /**
   /**
-   * We are sending request and elements,
-   * and thus only expect elements from the other peer.
+   * We have send the number of our elements to the other
+   * peer, but did not setup our element set yet.
    */
    */
-  PHASE_EXPECT_ELEMENTS,
+  PHASE_COUNT_SENT,
+
   /**
   /**
-   * We are expecting elements and requests, and send
-   * requested elements back to the other peer.
+   * We have initialized our set and are now reducing it by exchanging
+   * Bloom filters until one party notices the their element hashes
+   * are equal.
    */
    */
-  PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
+  PHASE_BF_EXCHANGE,
+
   /**
   /**
-   * The protocol is over.
-   * Results may still have to be sent to the client.
+   * The protocol is over.  Results may still have to be sent to the
+   * client.
    */
   PHASE_FINISHED
    */
   PHASE_FINISHED
+
 };
 
 
 /**
 };
 
 
 /**
- * State of an evaluate operation
- * with another peer.
+ * State of an evaluate operation with another peer.
  */
  */
-struct IntersectionEvaluateOperation
+struct OperationState
 {
   /**
 {
   /**
-   * Local set the operation is evaluated on.
-   */
-  struct Set *set;
-
-  /**
-   * Peer with the remote set
-   */
-  struct GNUNET_PeerIdentity peer;
-
-  /**
-   * Application-specific identifier
-   */
-  struct GNUNET_HashCode app_id;
-
-  /**
-   * Context message, given to us
-   * by the client, may be NULL.
+   * The bf we currently receive
    */
    */
-  struct GNUNET_MessageHeader *context_msg;
+  struct GNUNET_CONTAINER_BloomFilter *remote_bf;
 
   /**
 
   /**
-   * Tunnel context for the peer we
-   * evaluate the union operation with.
+   * BF of the set's element.
    */
    */
-  struct TunnelContext *tc;
+  struct GNUNET_CONTAINER_BloomFilter *local_bf;
 
   /**
 
   /**
-   * Request ID to multiplex set operations to
-   * the client inhabiting the set.
+   * Remaining elements in the intersection operation.
+   * Maps element-id-hashes to 'elements in our set'.
    */
    */
-  uint32_t request_id;
+  struct GNUNET_CONTAINER_MultiHashMap *my_elements;
 
   /**
 
   /**
-   * Number of ibf buckets received
+   * Iterator for sending the final set of @e my_elements to the client.
    */
    */
-  unsigned int ibf_buckets_received;
+  struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
 
   /**
 
   /**
-   * Copy of the set's strata estimator at the time of
-   * creation of this operation
+   * Evaluate operations are held in a linked list.
    */
    */
-  struct StrataEstimator *se;
+  struct OperationState *next;
 
   /**
 
   /**
-   * The ibf we currently receive
+   * Evaluate operations are held in a linked list.
    */
    */
-  struct InvertibleBloomFilter *remote_ibf;
+  struct OperationState *prev;
 
   /**
 
   /**
-   * IBF of the set's element.
+   * For multipart BF transmissions, we have to store the
+   * bloomfilter-data until we fully received it.
    */
    */
-  struct InvertibleBloomFilter *local_ibf;
+  char *bf_data;
 
   /**
 
   /**
-   * Maps IBF-Keys (specific to the current salt) to elements.
+   * XOR of the keys of all of the elements (remaining) in my set.
+   * Always updated when elements are added or removed to
+   * @e my_elements.
    */
    */
-  struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
+  struct GNUNET_HashCode my_xor;
 
   /**
 
   /**
-   * Current state of the operation.
+   * XOR of the keys of all of the elements (remaining) in
+   * the other peer's set.  Updated when we receive the
+   * other peer's Bloom filter.
    */
    */
-  enum IntersectionOperationPhase phase;
+  struct GNUNET_HashCode other_xor;
 
   /**
 
   /**
-   * Salt to use for this operation.
+   * How many bytes of @e bf_data are valid?
    */
    */
-  uint16_t salt;
+  uint32_t bf_data_offset;
 
   /**
 
   /**
-   * Generation in which the operation handle
-   * was created.
-   */
-  unsigned int generation_created;
-  
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct IntersectionEvaluateOperation *next;
-  
-   /**
-   * Evaluate operations are held in
-   * a linked list.
+   * Current element count contained within @e my_elements.
+   * (May differ briefly during initialization.)
    */
    */
-  struct IntersectionEvaluateOperation *prev;
-};
-
+  uint32_t my_element_count;
 
 
-/**
- * Information about the element in a set.
- * All elements are stored in a hash-table
- * from their hash-code to their 'struct Element',
- * so that the remove and add operations are reasonably
- * fast.
- */
-struct ElementEntry
-{
   /**
   /**
-   * The actual element. The data for the element
-   * should be allocated at the end of this struct.
+   * size of the bloomfilter in @e bf_data.
    */
    */
-  struct GNUNET_SET_Element element;
+  uint32_t bf_data_size;
 
   /**
 
   /**
-   * Hash of the element.
-   * Will be used to derive the different IBF keys
-   * for different salts.
+   * size of the bloomfilter
    */
    */
-  struct GNUNET_HashCode element_hash;
+  uint32_t bf_bits_per_element;
 
   /**
 
   /**
-   * Generation the element was added by the client.
-   * Operations of earlier generations will not consider the element.
+   * Salt currently used for BF construction (by us or the other peer,
+   * depending on where we are in the code).
    */
    */
-  unsigned int generation_added;
+  uint32_t salt;
 
   /**
 
   /**
-   * GNUNET_YES if the element has been removed in some generation.
+   * Current state of the operation.
    */
    */
-  int removed;
+  enum IntersectionOperationPhase phase;
 
   /**
 
   /**
-   * Generation the element was removed by the client. 
-   * Operations of later generations will not consider the element.
-   * Only valid if is_removed is GNUNET_YES.
+   * Generation in which the operation handle
+   * was created.
    */
    */
-  unsigned int generation_removed;
+  unsigned int generation_created;
 
   /**
 
   /**
-   * GNUNET_YES if the element is a remote element, and does not belong
-   * to the operation's set.
+   * Did we send the client that we are done?
    */
    */
-  int remote;
+  int client_done_sent;
 };
 
 
 /**
 };
 
 
 /**
- * Entries in the key-to-element map of the union set.
- */
-struct KeyEntry
-{
-  /**
-   * IBF key for the entry, derived from the current salt.
-   */
-  struct IBF_Key ibf_key;
-
-  /**
-   * The actual element associated with the key
-   */
-  struct ElementEntry *element;
-
-  /**
-   * Element that collides with this element
-   * on the ibf key
-   */
-  struct KeyEntry *next_colliding;
-};
-
-/**
- * Used as a closure for sending elements
- * with a specific IBF key.
+ * Extra state required for efficient set intersection.
+ * Merely tracks the total number of elements.
  */
  */
-struct SendElementClosure
+struct SetState
 {
   /**
 {
   /**
-   * The IBF key whose matching elements should be
-   * sent.
-   */
-  struct IBF_Key ibf_key;
-
-  /**
-   * Operation for which the elements
-   * should be sent.
+   * Number of currently valid elements in the set which have not been
+   * removed.
    */
    */
-  struct UnionEvaluateOperation *eo;
+  uint32_t current_set_element_count;
 };
 
 
 /**
 };
 
 
 /**
- * Extra state required for efficient set union.
+ * If applicable in the current operation mode, send a result message
+ * to the client indicating we removed an element.
+ *
+ * @param op intersection operation
+ * @param element element to send
  */
  */
-struct IntersectionState
+static void
+send_client_removed_element (struct Operation *op,
+                             struct GNUNET_SET_Element *element)
 {
 {
-  /**
-   * The strata estimator is only generated once for
-   * each set.
-   * The IBF keys are derived from the element hashes with
-   * salt=0.
-   */
-  struct StrataEstimator *se;
-
-  /**
-   * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *elements;
-
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *ops_head;
-
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *ops_tail;
-
-  /**
-   * Current generation, that is, number of
-   * previously executed operations on this set
-   */
-  unsigned int current_generation;
-};
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ResultMessage *rm;
 
 
+  if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
+    return; /* Wrong mode for transmitting removed elements */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending removed element (size %u) to client\n",
+              element->size);
+  GNUNET_assert (0 != op->spec->client_request_id);
+  ev = GNUNET_MQ_msg_extra (rm,
+                            element->size,
+                            GNUNET_MESSAGE_TYPE_SET_RESULT);
+  if (NULL == ev)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  rm->result_status = htons (GNUNET_SET_STATUS_OK);
+  rm->request_id = htonl (op->spec->client_request_id);
+  rm->element_type = element->element_type;
+  GNUNET_memcpy (&rm[1],
+          element->data,
+          element->size);
+  GNUNET_MQ_send (op->spec->set->client_mq,
+                  ev);
+}
 
 
 /**
 
 
 /**
- * Iterator over hash map entries.
+ * Fills the "my_elements" hashmap with all relevant elements.
  *
  *
- * @param cls closure
+ * @param cls the `struct Operation *` we are performing
  * @param key current key code
  * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @param value the `struct ElementEntry *` from the hash map
+ * @return #GNUNET_YES (we should continue to iterate)
  */
 static int
  */
 static int
-destroy_elements_iterator (void *cls,
-                           const struct GNUNET_HashCode * key,
-                           void *value)
+filtered_map_initialization (void *cls,
+                             const struct GNUNET_HashCode *key,
+                             void *value)
 {
 {
+  struct Operation *op = cls;
   struct ElementEntry *ee = value;
   struct ElementEntry *ee = value;
+  struct GNUNET_HashCode mutated_hash;
 
 
-  GNUNET_free (ee);
-  return GNUNET_YES;
-}
 
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "FIMA called for %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
 
 
-/**
- * Destroy the elements belonging to a union set.
- *
- * @param us union state that contains the elements
- */
-static void
-destroy_elements (struct IntersectionState *us)
-{
-  if (NULL == us->elements)
-    return;
-  GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (us->elements);
-  us->elements = NULL;
-}
+  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Reduced initialization, not starting with %s:%u (wrong generation)\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
+    return GNUNET_YES; /* element not valid in our operation's generation */
+  }
+
+  /* Test if element is in other peer's bloomfilter */
+  GNUNET_BLOCK_mingle_hash (&ee->element_hash,
+                            op->state->salt,
+                            &mutated_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Testing mingled hash %s with salt %u\n",
+              GNUNET_h2s (&mutated_hash),
+              op->state->salt);
+  if (GNUNET_NO ==
+      GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
+                                         &mutated_hash))
+  {
+    /* remove this element */
+    send_client_removed_element (op,
+                                 &ee->element);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Reduced initialization, not starting with %s:%u\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
+    return GNUNET_YES;
+  }
+  op->state->my_element_count++;
+  GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
+                          &ee->element_hash,
+                          &op->state->my_xor);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Filtered initialization of my_elements, adding %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
+                                                   &ee->element_hash,
+                                                   ee,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
 
 
+  return GNUNET_YES;
+}
 
 
 /**
 
 
 /**
- * Iterator over hash map entries.
+ * Removes elements from our hashmap if they are not contained within the
+ * provided remote bloomfilter.
  *
  *
- * @param cls closure
+ * @param cls closure with the `struct Operation *`
  * @param key current key code
  * @param value value in the hash map
  * @param key current key code
  * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @return #GNUNET_YES (we should continue to iterate)
  */
 static int
  */
 static int
-destroy_key_to_element_iter (void *cls,
-                             uint32_t key,
-                             void *value)
+iterator_bf_reduce (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
 {
 {
-  struct KeyEntry *k = value;
-  
-  while (NULL != k)
+  struct Operation *op = cls;
+  struct ElementEntry *ee = value;
+  struct GNUNET_HashCode mutated_hash;
+
+  GNUNET_BLOCK_mingle_hash (&ee->element_hash,
+                            op->state->salt,
+                            &mutated_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Testing mingled hash %s with salt %u\n",
+              GNUNET_h2s (&mutated_hash),
+              op->state->salt);
+  if (GNUNET_NO ==
+      GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
+                                         &mutated_hash))
+  {
+    GNUNET_break (0 < op->state->my_element_count);
+    op->state->my_element_count--;
+    GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
+                            &ee->element_hash,
+                            &op->state->my_xor);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Bloom filter reduction of my_elements, removing %s:%u\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
+    GNUNET_assert (GNUNET_YES ==
+                   GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
+                                                         &ee->element_hash,
+                                                         ee));
+    send_client_removed_element (op,
+                                 &ee->element);
+  }
+  else
   {
   {
-    struct KeyEntry *k_tmp = k;
-    k = k->next_colliding;
-    GNUNET_free (k_tmp);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Bloom filter reduction of my_elements, keeping %s:%u\n",
+                GNUNET_h2s (&ee->element_hash),
+                ee->element.size);
   }
   return GNUNET_YES;
 }
 
 
 /**
   }
   return GNUNET_YES;
 }
 
 
 /**
- * Destroy a union operation, and free all resources
- * associated with it.
+ * Create initial bloomfilter based on all the elements given.
  *
  *
- * @param eo the union operation to destroy
+ * @param cls the `struct Operation *`
+ * @param key current key code
+ * @param value the `struct ElementEntry` to process
+ * @return #GNUNET_YES (we should continue to iterate)
  */
  */
-void
-_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
+static int
+iterator_bf_create (void *cls,
+                    const struct GNUNET_HashCode *key,
+                    void *value)
 {
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
-  
-  if (NULL != eo->tc)
-  {
-    GNUNET_MQ_destroy (eo->tc->mq);
-    GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
-    GNUNET_free (eo->tc);
-    eo->tc = NULL;
-  }
-
-  if (NULL != eo->remote_ibf)
-  {
-    ibf_destroy (eo->remote_ibf);
-    eo->remote_ibf = NULL;
-  }
-  if (NULL != eo->local_ibf)
-  {
-    ibf_destroy (eo->local_ibf);
-    eo->local_ibf = NULL;
-  }
-  if (NULL != eo->se)
-  {
-    strata_estimator_destroy (eo->se);
-    eo->se = NULL;
-  }
-  if (NULL != eo->key_to_element)
-  {
-    GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
-    GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
-    eo->key_to_element = NULL;
-  }
-
-  GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
-                               eo->set->state.u->ops_tail,
-                               eo);
-  GNUNET_free (eo);
-
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
-
-
-  /* FIXME: do a garbage collection of the set generations */
+  struct Operation *op = cls;
+  struct ElementEntry *ee = value;
+  struct GNUNET_HashCode mutated_hash;
+
+  GNUNET_BLOCK_mingle_hash (&ee->element_hash,
+                            op->state->salt,
+                            &mutated_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initializing BF with hash %s with salt %u\n",
+              GNUNET_h2s (&mutated_hash),
+              op->state->salt);
+  GNUNET_CONTAINER_bloomfilter_add (op->state->local_bf,
+                                    &mutated_hash);
+  return GNUNET_YES;
 }
 
 
 /**
 }
 
 
 /**
- * Inform the client that the union operation has failed,
+ * Inform the client that the intersection operation has failed,
  * and proceed to destroy the evaluate operation.
  *
  * and proceed to destroy the evaluate operation.
  *
- * @param eo the union operation to fail
+ * @param op the intersection operation to fail
  */
 static void
  */
 static void
-fail_union_operation (struct UnionEvaluateOperation *eo)
+fail_intersection_operation (struct Operation *op)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *msg;
 
   struct GNUNET_SET_ResultMessage *msg;
 
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Intersection operation failed\n");
+  if (NULL != op->state->my_elements)
+  {
+    GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
+    op->state->my_elements = NULL;
+  }
+  ev = GNUNET_MQ_msg (msg,
+                      GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
-  msg->request_id = htonl (eo->request_id);
-  GNUNET_MQ_send (eo->set->client_mq, mqm);
-  _GSS_union_operation_destroy (eo);
-}
-
-
-/**
- * Derive the IBF key from a hash code and 
- * a salt.
- *
- * @param src the hash code
- * @param salt salt to use
- * @return the derived IBF key
- */
-static struct IBF_Key
-get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
-{
-  struct IBF_Key key;
-
-  GNUNET_CRYPTO_hkdf (&key, sizeof (key),
-                     GCRY_MD_SHA512, GCRY_MD_SHA256,
-                      src, sizeof *src,
-                     &salt, sizeof (salt),
-                     NULL, 0);
-  return key;
+  msg->request_id = htonl (op->spec->client_request_id);
+  msg->element_type = htons (0);
+  GNUNET_MQ_send (op->spec->set->client_mq,
+                  ev);
+  _GSS_operation_destroy (op,
+                          GNUNET_YES);
 }
 
 
 /**
 }
 
 
 /**
- * Send a request for the evaluate operation to a remote peer
+ * Send a bloomfilter to our peer.  After the result done message has
+ * been sent to the client, destroy the evaluate operation.
  *
  *
- * @param eo operation with the other peer
+ * @param op intersection operation
  */
 static void
  */
 static void
-send_operation_request (struct UnionEvaluateOperation *eo)
+send_bloomfilter (struct Operation *op)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct OperationRequestMessage *msg;
-
-  mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
-
-  if (NULL == mqm)
-  {
-    /* the context message is too large */
-    GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (eo->set->client);
-    return;
-  }
-  msg->operation = htons (GNUNET_SET_OPERATION_UNION);
-  msg->app_id = eo->app_id;
-  GNUNET_MQ_send (eo->tc->mq, mqm);
-
-  if (NULL != eo->context_msg)
+  struct GNUNET_MQ_Envelope *ev;
+  struct BFMessage *msg;
+  uint32_t bf_size;
+  uint32_t bf_elementbits;
+  uint32_t chunk_size;
+  char *bf_data;
+  uint32_t offset;
+
+  /* We consider the ratio of the set sizes to determine
+     the number of bits per element, as the smaller set
+     should use more bits to maximize its set reduction
+     potential and minimize overall bandwidth consumption. */
+  bf_elementbits = 2 + ceil (log2((double)
+                             (op->spec->remote_element_count /
+                                   (double) op->state->my_element_count)));
+  if (bf_elementbits < 1)
+    bf_elementbits = 1; /* make sure k is not 0 */
+  /* optimize BF-size to ~50% of bits set */
+  bf_size = ceil ((double) (op->state->my_element_count
+                            * bf_elementbits / log(2)));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending Bloom filter (%u) of size %u bytes\n",
+              (unsigned int) bf_elementbits,
+              (unsigned int) bf_size);
+  op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
+                                                           bf_size,
+                                                           bf_elementbits);
+  op->state->salt = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE,
+                                              UINT32_MAX);
+  GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
+                                         &iterator_bf_create,
+                                         op);
+
+  /* send our Bloom filter */
+  chunk_size = 60 * 1024 - sizeof (struct BFMessage);
+  if (bf_size <= chunk_size)
   {
   {
-    GNUNET_free (eo->context_msg);
-    eo->context_msg = NULL;
+    /* singlepart */
+    chunk_size = bf_size;
+    ev = GNUNET_MQ_msg_extra (msg,
+                              chunk_size,
+                              GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
+    GNUNET_assert (GNUNET_SYSERR !=
+                   GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
+                                                              (char*) &msg[1],
+                                                              bf_size));
+    msg->sender_element_count = htonl (op->state->my_element_count);
+    msg->bloomfilter_total_length = htonl (bf_size);
+    msg->bits_per_element = htonl (bf_elementbits);
+    msg->sender_mutator = htonl (op->state->salt);
+    msg->element_xor_hash = op->state->my_xor;
+    GNUNET_MQ_send (op->mq, ev);
   }
   }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
-}
-
-
-/**
- * Iterator to create the mapping between ibf keys
- * and element entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-insert_element_iterator (void *cls,
-                         uint32_t key,
-                         void *value)
-{
-  struct KeyEntry *const new_k = cls;
-  struct KeyEntry *old_k = value;
-
-  GNUNET_assert (NULL != old_k);
-  do
+  else
   {
   {
-    if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
+    /* multipart */
+    bf_data = GNUNET_malloc (bf_size);
+    GNUNET_assert (GNUNET_SYSERR !=
+                   GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
+                                                              bf_data,
+                                                              bf_size));
+    offset = 0;
+    while (offset < bf_size)
     {
     {
-      new_k->next_colliding = old_k->next_colliding;
-      old_k->next_colliding = new_k;
-      return GNUNET_NO;
+      if (bf_size - chunk_size < offset)
+        chunk_size = bf_size - offset;
+      ev = GNUNET_MQ_msg_extra (msg,
+                                chunk_size,
+                                GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
+      GNUNET_memcpy (&msg[1],
+              &bf_data[offset],
+              chunk_size);
+      offset += chunk_size;
+      msg->sender_element_count = htonl (op->state->my_element_count);
+      msg->bloomfilter_total_length = htonl (bf_size);
+      msg->bits_per_element = htonl (bf_elementbits);
+      msg->sender_mutator = htonl (op->state->salt);
+      msg->element_xor_hash = op->state->my_xor;
+      GNUNET_MQ_send (op->mq, ev);
     }
     }
-    old_k = old_k->next_colliding;
-  } while (NULL != old_k);
-  return GNUNET_YES;
+    GNUNET_free (bf_data);
+  }
+  GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
+  op->state->local_bf = NULL;
 }
 
 
 /**
 }
 
 
 /**
- * Insert an element into the union operation's
- * key-to-element mapping
+ * Signal to the client that the operation has finished and
+ * destroy the operation.
  *
  *
- * @param eo the union operation
- * @param ee the element entry
+ * @param cls operation to destroy
  */
 static void
  */
 static void
-insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
+send_client_done_and_destroy (void *cls)
 {
 {
-  int ret;
-  struct IBF_Key ibf_key;
-  struct KeyEntry *k;
-
-  ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
-  k = GNUNET_new (struct KeyEntry);
-  k->element = ee;
-  k->ibf_key = ibf_key;
-  ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
-                                                      (uint32_t) ibf_key.key_val,
-                                                      insert_element_iterator, k);
-
-  /* was the element inserted into a colliding bucket? */
-  if (GNUNET_SYSERR == ret)
-    return;
+  struct Operation *op = cls;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ResultMessage *rm;
 
 
-  GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  ev = GNUNET_MQ_msg (rm,
+                      GNUNET_MESSAGE_TYPE_SET_RESULT);
+  rm->request_id = htonl (op->spec->client_request_id);
+  rm->result_status = htons (GNUNET_SET_STATUS_DONE);
+  rm->element_type = htons (0);
+  GNUNET_MQ_send (op->spec->set->client_mq,
+                  ev);
+  _GSS_operation_destroy (op,
+                          GNUNET_YES);
 }
 
 
 /**
 }
 
 
 /**
- * Insert a key into an ibf.
+ * Send all elements in the full result iterator.
  *
  *
- * @param cls the ibf
- * @param key unused
- * @param value the key entry to get the key from
+ * @param cls the `struct Operation *`
  */
  */
-static int
-prepare_ibf_iterator (void *cls,
-                      uint32_t key,
-                      void *value)
+static void
+send_remaining_elements (void *cls)
 {
 {
-  struct InvertibleBloomFilter *ibf = cls;
-  struct KeyEntry *ke = value;
+  struct Operation *op = cls;
+  const void *nxt;
+  const struct ElementEntry *ee;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ResultMessage *rm;
+  const struct GNUNET_SET_Element *element;
+  int res;
 
 
-  ibf_insert (ibf, ke->ibf_key);
-  return GNUNET_YES;
+  res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter,
+                                                     NULL,
+                                                     &nxt);
+  if (GNUNET_NO == res)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending done and destroy because iterator ran out\n");
+    op->keep--;
+    send_client_done_and_destroy (op);
+    return;
+  }
+  ee = nxt;
+  element = &ee->element;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending element %s:%u to client (full set)\n",
+              GNUNET_h2s (&ee->element_hash),
+              element->size);
+  GNUNET_assert (0 != op->spec->client_request_id);
+  ev = GNUNET_MQ_msg_extra (rm,
+                            element->size,
+                            GNUNET_MESSAGE_TYPE_SET_RESULT);
+  GNUNET_assert (NULL != ev);
+  rm->result_status = htons (GNUNET_SET_STATUS_OK);
+  rm->request_id = htonl (op->spec->client_request_id);
+  rm->element_type = element->element_type;
+  GNUNET_memcpy (&rm[1],
+          element->data,
+          element->size);
+  GNUNET_MQ_notify_sent (ev,
+                         &send_remaining_elements,
+                         op);
+  GNUNET_MQ_send (op->spec->set->client_mq,
+                  ev);
 }
 
 
 /**
 }
 
 
 /**
- * Iterator for initializing the
- * key-to-element mapping of a union operation
+ * Inform the peer that this operation is complete.
  *
  *
- * @param cls the union operation
- * @param key unised
- * @param value the element entry to insert
- *        into the key-to-element mapping
+ * @param op the intersection operation to fail
  */
  */
-static int
-init_key_to_element_iterator (void *cls,
-                              const struct GNUNET_HashCode *key,
-                              void *value)
+static void
+send_peer_done (struct Operation *op)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
-  struct ElementEntry *e = value;
-
-  /* make sure that the element belongs to the set at the time
-   * of creating the operation */
-  if ( (e->generation_added > eo->generation_created) ||
-       ( (GNUNET_YES == e->removed) &&
-         (e->generation_removed < eo->generation_created)))
-    return GNUNET_YES;
-
-  insert_element (eo, e);
-  return GNUNET_YES;
+  struct GNUNET_MQ_Envelope *ev;
+  struct IntersectionDoneMessage *idm;
+
+  op->state->phase = PHASE_FINISHED;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Intersection succeeded, sending DONE\n");
+  GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
+  op->state->local_bf = NULL;
+
+  ev = GNUNET_MQ_msg (idm,
+                      GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
+  idm->final_element_count = htonl (op->state->my_element_count);
+  idm->element_xor_hash = op->state->my_xor;
+  GNUNET_MQ_send (op->mq,
+                  ev);
 }
 
 
 /**
 }
 
 
 /**
- * Create an ibf with the operation's elements
- * of the specified size
+ * Process a Bloomfilter once we got all the chunks.
  *
  *
- * @param eo the union operation
- * @param size size of the ibf to create
+ * @param op the intersection operation
  */
 static void
  */
 static void
-prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
+process_bf (struct Operation *op)
 {
 {
-  if (NULL == eo->key_to_element)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
+              op->state->phase,
+              op->spec->remote_element_count,
+              op->state->my_element_count,
+              GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
+  switch (op->state->phase)
+  {
+  case PHASE_INITIAL:
+    GNUNET_break_op (0);
+    fail_intersection_operation(op);
+    return;
+  case PHASE_COUNT_SENT:
+    /* This is the first BF being sent, build our initial map with
+       filtering in place */
+    op->state->my_elements
+      = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
+                                              GNUNET_YES);
+    op->state->my_element_count = 0;
+    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
+                                           &filtered_map_initialization,
+                                           op);
+    break;
+  case PHASE_BF_EXCHANGE:
+    /* Update our set by reduction */
+    GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
+                                           &iterator_bf_reduce,
+                                           op);
+    break;
+  case PHASE_FINISHED:
+    GNUNET_break_op (0);
+    fail_intersection_operation(op);
+    return;
+  }
+  GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
+  op->state->remote_bf = NULL;
+
+  if ( (0 == op->state->my_element_count) || /* fully disjoint */
+       ( (op->state->my_element_count == op->spec->remote_element_count) &&
+         (0 == memcmp (&op->state->my_xor,
+                       &op->state->other_xor,
+                       sizeof (struct GNUNET_HashCode))) ) )
   {
   {
-    unsigned int len;
-    len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
-    eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
-    GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
-                                             init_key_to_element_iterator, eo);
+    /* we are done */
+    send_peer_done (op);
+    return;
   }
   }
-  if (NULL != eo->local_ibf)
-    ibf_destroy (eo->local_ibf);
-  eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
-  GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
-                                           prepare_ibf_iterator, eo->local_ibf);
+  op->state->phase = PHASE_BF_EXCHANGE;
+  send_bloomfilter (op);
 }
 
 
 /**
 }
 
 
 /**
- * Send an ibf of appropriate size.
+ * Handle an BF message from a remote peer.
  *
  *
- * @param eo the union operation
- * @param ibf_order order of the ibf to send, size=2^order
+ * @param cls the intersection operation
+ * @param mh the header of the message
  */
 static void
  */
 static void
-send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
+handle_p2p_bf (void *cls,
+               const struct GNUNET_MessageHeader *mh)
 {
 {
-  unsigned int buckets_sent = 0;
-  struct InvertibleBloomFilter *ibf;
-
-  prepare_ibf (eo, 1<<ibf_order);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
-
-  ibf = eo->local_ibf;
-
-  while (buckets_sent < (1 << ibf_order))
+  struct Operation *op = cls;
+  const struct BFMessage *msg;
+  uint32_t bf_size;
+  uint32_t chunk_size;
+  uint32_t bf_bits_per_element;
+  uint16_t msize;
+
+  msize = htons (mh->size);
+  if (msize < sizeof (struct BFMessage))
   {
   {
-    unsigned int buckets_in_message;
-    struct GNUNET_MQ_Envelope *mqm;
-    struct IBFMessage *msg;
-
-    buckets_in_message = (1 << ibf_order) - buckets_sent;
-    /* limit to maximum */
-    if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
-      buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
-
-    mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
-                               GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
-    msg->order = ibf_order;
-    msg->offset = htons (buckets_sent);
-    ibf_write_slice (ibf, buckets_sent,
-                     buckets_in_message, &msg[1]);
-    buckets_sent += buckets_in_message;
-    GNUNET_MQ_send (eo->tc->mq, mqm);
+    GNUNET_break_op (0);
+    fail_intersection_operation (op);
+    return;
+  }
+  msg = (const struct BFMessage *) mh;
+  switch (op->state->phase)
+  {
+  case PHASE_INITIAL:
+    GNUNET_break_op (0);
+    fail_intersection_operation (op);
+    break;
+  case PHASE_COUNT_SENT:
+  case PHASE_BF_EXCHANGE:
+    bf_size = ntohl (msg->bloomfilter_total_length);
+    bf_bits_per_element = ntohl (msg->bits_per_element);
+    chunk_size = msize - sizeof (struct BFMessage);
+    op->state->other_xor = msg->element_xor_hash;
+    if (bf_size == chunk_size)
+    {
+      if (NULL != op->state->bf_data)
+      {
+        GNUNET_break_op (0);
+        fail_intersection_operation (op);
+        return;
+      }
+      /* single part, done here immediately */
+      op->state->remote_bf
+        = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
+                                             bf_size,
+                                             bf_bits_per_element);
+      op->state->salt = ntohl (msg->sender_mutator);
+      op->spec->remote_element_count = ntohl (msg->sender_element_count);
+      process_bf (op);
+      return;
+    }
+    /* multipart chunk */
+    if (NULL == op->state->bf_data)
+    {
+      /* first chunk, initialize */
+      op->state->bf_data = GNUNET_malloc (bf_size);
+      op->state->bf_data_size = bf_size;
+      op->state->bf_bits_per_element = bf_bits_per_element;
+      op->state->bf_data_offset = 0;
+      op->state->salt = ntohl (msg->sender_mutator);
+      op->spec->remote_element_count = ntohl (msg->sender_element_count);
+    }
+    else
+    {
+      /* increment */
+      if ( (op->state->bf_data_size != bf_size) ||
+           (op->state->bf_bits_per_element != bf_bits_per_element) ||
+           (op->state->bf_data_offset + chunk_size > bf_size) ||
+           (op->state->salt != ntohl (msg->sender_mutator)) ||
+           (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
+      {
+        GNUNET_break_op (0);
+        fail_intersection_operation (op);
+        return;
+      }
+    }
+    GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
+            (const char*) &msg[1],
+            chunk_size);
+    op->state->bf_data_offset += chunk_size;
+    if (op->state->bf_data_offset == bf_size)
+    {
+      /* last chunk, run! */
+      op->state->remote_bf
+        = GNUNET_CONTAINER_bloomfilter_init (op->state->bf_data,
+                                             bf_size,
+                                             bf_bits_per_element);
+      GNUNET_free (op->state->bf_data);
+      op->state->bf_data = NULL;
+      op->state->bf_data_size = 0;
+      process_bf (op);
+    }
+    break;
+  default:
+    GNUNET_break_op (0);
+    fail_intersection_operation (op);
+    break;
   }
   }
-
-  eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
 }
 
 
 /**
 }
 
 
 /**
- * Send a strata estimator to the remote peer.
+ * Fills the "my_elements" hashmap with the initial set of
+ * (non-deleted) elements from the set of the specification.
  *
  *
- * @param eo the union operation with the remote peer
+ * @param cls closure with the `struct Operation *`
+ * @param key current key code for the element
+ * @param value value in the hash map with the `struct ElementEntry *`
+ * @return #GNUNET_YES (we should continue to iterate)
  */
  */
-static void
-send_strata_estimator (struct UnionEvaluateOperation *eo)
+static int
+initialize_map_unfiltered (void *cls,
+                           const struct GNUNET_HashCode *key,
+                           void *value)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_MessageHeader *strata_msg;
-
-  mqm = GNUNET_MQ_msg_header_extra (strata_msg,
-                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
-                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
-  strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
-  GNUNET_MQ_send (eo->tc->mq, mqm);
-  eo->phase = PHASE_EXPECT_IBF;
+  struct ElementEntry *ee = value;
+  struct Operation *op = cls;
+
+  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+    return GNUNET_YES; /* element not live in operation's generation */
+  GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
+                          &ee->element_hash,
+                          &op->state->my_xor);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initial full initialization of my_elements, adding %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
+  GNUNET_break (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
+                                                   &ee->element_hash,
+                                                   ee,
+                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  return GNUNET_YES;
 }
 
 
 /**
 }
 
 
 /**
- * Compute the necessary order of an ibf
- * from the size of the symmetric set difference.
+ * Send our element count to the peer, in case our element count is
+ * lower than his.
  *
  *
- * @param diff the difference
- * @return the required size of the ibf
+ * @param op intersection operation
  */
  */
-static unsigned int
-get_order_from_difference (unsigned int diff)
+static void
+send_element_count (struct Operation *op)
 {
 {
-  unsigned int ibf_order;
-
-  ibf_order = 2;
-  while ((1<<ibf_order) < (2 * diff))
-    ibf_order++;
-  if (ibf_order > MAX_IBF_ORDER)
-    ibf_order = MAX_IBF_ORDER;
-  return ibf_order;
+  struct GNUNET_MQ_Envelope *ev;
+  struct IntersectionElementInfoMessage *msg;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending our element count (%u)\n",
+              op->state->my_element_count);
+  ev = GNUNET_MQ_msg (msg,
+                      GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
+  msg->sender_element_count = htonl (op->state->my_element_count);
+  GNUNET_MQ_send (op->mq, ev);
 }
 
 
 /**
 }
 
 
 /**
- * Handle a strata estimator from a remote peer
+ * We go first, initialize our map with all elements and
+ * send the first Bloom filter.
  *
  *
- * @param cls the union operation
- * @param mh the message
+ * @param op operation to start exchange for
  */
 static void
  */
 static void
-handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
+begin_bf_exchange (struct Operation *op)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
-  struct StrataEstimator *remote_se;
-  int diff;
-
-
-  if (eo->phase != PHASE_EXPECT_SE)
-  {
-    fail_union_operation (eo);
-    GNUNET_break (0);
-    return;
-  }
-  remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
-                                       SE_IBF_HASH_NUM);
-  strata_estimator_read (&mh[1], remote_se);
-  GNUNET_assert (NULL != eo->se);
-  diff = strata_estimator_difference (remote_se, eo->se);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
-  strata_estimator_destroy (remote_se);
-  strata_estimator_destroy (eo->se);
-  eo->se = NULL;
-  send_ibf (eo, get_order_from_difference (diff));
+  op->state->phase = PHASE_BF_EXCHANGE;
+  op->state->my_elements
+    = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
+                                            GNUNET_YES);
+  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
+                                         &initialize_map_unfiltered,
+                                         op);
+  send_bloomfilter (op);
 }
 
 
 }
 
 
-
 /**
 /**
- * Iterator to send elements to a remote peer
+ * Handle the initial `struct IntersectionElementInfoMessage` from a
+ * remote peer.
  *
  *
- * @param cls closure with the element key and the union operation
- * @param key ignored
- * @param value the key entry
+ * @param cls the intersection operation
+ * @param mh the header of the message
  */
  */
-static int
-send_element_iterator (void *cls,
-                       uint32_t key,
-                       void *value)
+static void
+handle_p2p_element_info (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct SendElementClosure *sec = cls;
-  struct IBF_Key ibf_key = sec->ibf_key;
-  struct UnionEvaluateOperation *eo = sec->eo;
-  struct KeyEntry *ke = value;
+  struct Operation *op = cls;
+  const struct IntersectionElementInfoMessage *msg;
 
 
-  if (ke->ibf_key.key_val != ibf_key.key_val)
-    return GNUNET_YES;
-  while (NULL != ke)
+  if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
   {
   {
-    const struct GNUNET_SET_Element *const element = &ke->element->element;
-    struct GNUNET_MQ_Envelope *mqm;
-    struct GNUNET_MessageHeader *mh;
-
-    GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
-    mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
-    if (NULL == mqm)
-    {
-      /* element too large */
-      GNUNET_break (0);
-      continue;
-    }
-    memcpy (&mh[1], element->data, element->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
-    GNUNET_MQ_send (eo->tc->mq, mqm);
-    ke = ke->next_colliding;
+    GNUNET_break_op (0);
+    fail_intersection_operation(op);
+    return;
   }
   }
-  return GNUNET_NO;
+  msg = (const struct IntersectionElementInfoMessage *) mh;
+  op->spec->remote_element_count = ntohl (msg->sender_element_count);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received remote element count (%u), I have %u\n",
+              op->spec->remote_element_count,
+              op->state->my_element_count);
+  if ( ( (PHASE_INITIAL != op->state->phase) &&
+         (PHASE_COUNT_SENT != op->state->phase) ) ||
+       (op->state->my_element_count > op->spec->remote_element_count) ||
+       (0 == op->state->my_element_count) ||
+       (0 == op->spec->remote_element_count) )
+  {
+    GNUNET_break_op (0);
+    fail_intersection_operation(op);
+    return;
+  }
+  GNUNET_break (NULL == op->state->remote_bf);
+  begin_bf_exchange (op);
 }
 
 }
 
+
 /**
 /**
- * Send all elements that have the specified IBF key
- * to the remote peer of the union operation
+ * Send a result message to the client indicating that the operation
+ * is over.  After the result done message has been sent to the
+ * client, destroy the evaluate operation.
  *
  *
- * @param eo union operation
- * @param ibf_key IBF key of interest
+ * @param op intersection operation
  */
 static void
  */
 static void
-send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
+finish_and_destroy (struct Operation *op)
 {
 {
-  struct SendElementClosure send_cls;
+  GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
 
 
-  send_cls.ibf_key = ibf_key;
-  send_cls.eo = eo;
-  GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
-                                                &send_element_iterator, &send_cls);
+  if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending full result set (%u elements)\n",
+                GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
+    op->state->full_result_iter
+      = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
+    op->keep++;
+    send_remaining_elements (op);
+    return;
+  }
+  send_client_done_and_destroy (op);
 }
 
 
 /**
 }
 
 
 /**
- * Decode which elements are missing on each side, and
- * send the appropriate elemens and requests
+ * Remove all elements from our hashmap.
  *
  *
- * @param eo union operation
+ * @param cls closure with the `struct Operation *`
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES (we should continue to iterate)
  */
  */
-static void
-decode_and_send (struct UnionEvaluateOperation *eo)
+static int
+filter_all (void *cls,
+            const struct GNUNET_HashCode *key,
+            void *value)
 {
 {
-  struct IBF_Key key;
-  int side;
-  struct InvertibleBloomFilter *diff_ibf;
-
-  GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
-
-  prepare_ibf (eo, eo->remote_ibf->size);
-  diff_ibf = ibf_dup (eo->local_ibf);
-  ibf_subtract (diff_ibf, eo->remote_ibf);
-
-  while (1)
-  {
-    int res;
-
-    res = ibf_decode (diff_ibf, &side, &key);
-    if (GNUNET_SYSERR == res)
-    {
-      int next_order;
-      next_order = 0;
-      while (1<<next_order < diff_ibf->size)
-        next_order++;
-      next_order++;
-      if (next_order <= MAX_IBF_ORDER)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
-                   "decoding failed, sending larger ibf (size %u)\n",
-                    1<<next_order);
-        send_ibf (eo, next_order);
-      }
-      else
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
-                   "set union failed: reached ibf limit\n");
-      }
-      break;
-    }
-    if (GNUNET_NO == res)
-    {
-      struct GNUNET_MQ_Envelope *mqm;
+  struct Operation *op = cls;
+  struct ElementEntry *ee = value;
 
 
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
-      mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-      GNUNET_MQ_send (eo->tc->mq, mqm);
-      break;
-    }
-    if (1 == side)
-    {
-      send_elements_for_key (eo, key);
-    }
-    else
-    {
-      struct GNUNET_MQ_Envelope *mqm;
-      struct GNUNET_MessageHeader *msg;
-
-      /* FIXME: before sending the request, check if we may just have the element */
-      /* FIXME: merge multiple requests */
-      mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
-                                        GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
-      *(struct IBF_Key *) &msg[1] = key;
-      GNUNET_MQ_send (eo->tc->mq, mqm);
-    }
-  }
-  ibf_destroy (diff_ibf);
+  GNUNET_break (0 < op->state->my_element_count);
+  op->state->my_element_count--;
+  GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
+                          &ee->element_hash,
+                          &op->state->my_xor);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Final reduction of my_elements, removing %s:%u\n",
+              GNUNET_h2s (&ee->element_hash),
+              ee->element.size);
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
+                                                       &ee->element_hash,
+                                                       ee));
+  send_client_removed_element (op,
+                               &ee->element);
+  return GNUNET_YES;
 }
 
 
 /**
 }
 
 
 /**
- * Handle an IBF message from a remote peer.
+ * Handle a done message from a remote peer
  *
  *
- * @param cls the union operation
- * @param mh the header of the message
+ * @param cls the intersection operation
+ * @param mh the message
  */
 static void
  */
 static void
-handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_done (void *cls,
+                 const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
-  struct IBFMessage *msg = (struct IBFMessage *) mh;
-  unsigned int buckets_in_message;
+  struct Operation *op = cls;
+  const struct IntersectionDoneMessage *idm;
 
 
-  if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
-       (eo->phase == PHASE_EXPECT_IBF) )
+  if (PHASE_BF_EXCHANGE != op->state->phase)
   {
   {
-    eo->phase = PHASE_EXPECT_IBF_CONT;
-    GNUNET_assert (NULL == eo->remote_ibf);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
-    eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
-    if (0 != ntohs (msg->offset))
-    {
-      GNUNET_break (0);
-      fail_union_operation (eo);
-    }
+    /* wrong phase to conclude? FIXME: Or should we allow this
+       if the other peer has _initially_ already an empty set? */
+    GNUNET_break_op (0);
+    fail_intersection_operation (op);
+    return;
   }
   }
-  else if (eo->phase == PHASE_EXPECT_IBF_CONT)
+  if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
   {
   {
-    if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
-         (1<<msg->order != eo->remote_ibf->size) )
-    {
-      GNUNET_break (0);
-      fail_union_operation (eo);
-      return;
-    }
+    GNUNET_break_op (0);
+    fail_intersection_operation (op);
+    return;
   }
   }
-
-  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
-
-  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
+  idm = (const struct IntersectionDoneMessage *) mh;
+  if (0 == ntohl (idm->final_element_count))
   {
   {
-    GNUNET_break (0);
-    fail_union_operation (eo);
-    return;
+    /* other peer determined empty set is the intersection,
+       remove all elements */
+    GNUNET_CONTAINER_multihashmap_iterate (op->state->my_elements,
+                                           &filter_all,
+                                           op);
   }
   }
-  
-  ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
-  eo->ibf_buckets_received += buckets_in_message;
-
-  if (eo->ibf_buckets_received == eo->remote_ibf->size)
+  if ( (op->state->my_element_count != ntohl (idm->final_element_count)) ||
+       (0 != memcmp (&op->state->my_xor,
+                     &idm->element_xor_hash,
+                     sizeof (struct GNUNET_HashCode))) )
   {
   {
-
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
-    eo->phase = PHASE_EXPECT_ELEMENTS;
-    decode_and_send (eo);
+    /* Other peer thinks we are done, but we disagree on the result! */
+    GNUNET_break_op (0);
+    fail_intersection_operation (op);
+    return;
   }
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Got IntersectionDoneMessage, have %u elements in intersection\n",
+              op->state->my_element_count);
+  op->state->phase = PHASE_FINISHED;
+  finish_and_destroy (op);
 }
 
 
 /**
 }
 
 
 /**
- * Send a result message to the client indicating
- * that there is a new element.
+ * Initiate a set intersection operation with a remote peer.
  *
  *
- * @param eo union operation
- * @param element element to send
+ * @param op operation that is created, should be initialized to
+ *        begin the evaluation
+ * @param opaque_context message to be transmitted to the listener
+ *        to convince him to accept, may be NULL
  */
 static void
  */
 static void
-send_client_element (struct UnionEvaluateOperation *eo,
-                     struct GNUNET_SET_Element *element)
+intersection_evaluate (struct Operation *op,
+                       const struct GNUNET_MessageHeader *opaque_context)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_ResultMessage *rm;
+  struct GNUNET_MQ_Envelope *ev;
+  struct OperationRequestMessage *msg;
 
 
-  GNUNET_assert (0 != eo->request_id);
-  mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
-  if (NULL == mqm)
+  op->state = GNUNET_new (struct OperationState);
+  /* we started the operation, thus we have to send the operation request */
+  op->state->phase = PHASE_INITIAL;
+  op->state->my_element_count = op->spec->set->state->current_set_element_count;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initiating intersection operation evaluation\n");
+  ev = GNUNET_MQ_msg_nested_mh (msg,
+                                GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+                                opaque_context);
+  if (NULL == ev)
   {
   {
-    GNUNET_MQ_discard (mqm);
+    /* the context message is too large!? */
     GNUNET_break (0);
     GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (op->spec->set->client);
     return;
   }
     return;
   }
-  rm->result_status = htons (GNUNET_SET_STATUS_OK);
-  rm->request_id = htonl (eo->request_id);
-  memcpy (&rm[1], element->data, element->size);
-  GNUNET_MQ_send (eo->set->client_mq, mqm);
+  msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
+  msg->element_count = htonl (op->state->my_element_count);
+  GNUNET_MQ_send (op->mq,
+                  ev);
+  op->state->phase = PHASE_COUNT_SENT;
+  if (NULL != opaque_context)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sent op request with context message\n");
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sent op request without context message\n");
 }
 
 
 /**
 }
 
 
 /**
- * Send a result message to the client indicating
- * that the operation is over.
- * After the result done message has been sent to the client,
- * destroy the evaluate operation.
+ * Accept an intersection operation request from a remote peer.  Only
+ * initializes the private operation state.
  *
  *
- * @param eo union operation
+ * @param op operation that will be accepted as an intersection operation
  */
 static void
  */
 static void
-send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
+intersection_accept (struct Operation *op)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_ResultMessage *rm;
-
-  GNUNET_assert (0 != eo->request_id);
-  mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
-  rm->request_id = htonl (eo->request_id);
-  rm->result_status = htons (GNUNET_SET_STATUS_DONE);
-  GNUNET_MQ_send (eo->set->client_mq, mqm);
-
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Accepting set intersection operation\n");
+  op->state = GNUNET_new (struct OperationState);
+  op->state->phase = PHASE_INITIAL;
+  op->state->my_element_count
+    = op->spec->set->state->current_set_element_count;
+  op->state->my_elements
+    = GNUNET_CONTAINER_multihashmap_create
+    (GNUNET_MIN (op->state->my_element_count,
+                 op->spec->remote_element_count),
+     GNUNET_YES);
+  if (op->spec->remote_element_count < op->state->my_element_count)
+  {
+    /* If the other peer (Alice) has fewer elements than us (Bob),
+       we just send the count as Alice should send the first BF */
+    send_element_count (op);
+    op->state->phase = PHASE_COUNT_SENT;
+    return;
+  }
+  /* We have fewer elements, so we start with the BF */
+  begin_bf_exchange (op);
 }
 
 
 /**
 }
 
 
 /**
- * Handle an element message from a remote peer.
+ * Dispatch messages for a intersection operation.
  *
  *
- * @param cls the union operation
- * @param mh the message
+ * @param op the state of the intersection evaluate operation
+ * @param mh the received message
+ * @return #GNUNET_SYSERR if the tunnel should be disconnected,
+ *         #GNUNET_OK otherwise
  */
  */
-static void
-handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
+static int
+intersection_handle_p2p_message (struct Operation *op,
+                                 const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
-  struct ElementEntry *ee;
-  uint16_t element_size;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
-
-  if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
-       (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received p2p message (t: %u, s: %u)\n",
+              ntohs (mh->type), ntohs (mh->size));
+  switch (ntohs (mh->type))
   {
   {
-    fail_union_operation (eo);
-    GNUNET_break (0);
-    return;
+    /* this message handler is not active until after we received an
+     * operation request message, thus the ops request is not handled here
+     */
+  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
+    handle_p2p_element_info (op, mh);
+    break;
+  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
+    handle_p2p_bf (op, mh);
+    break;
+  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
+    handle_p2p_done (op, mh);
+    break;
+  default:
+    /* something wrong with cadet's message handlers? */
+    GNUNET_assert (0);
   }
   }
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
-  ee = GNUNET_malloc (sizeof *eo + element_size);
-  memcpy (&ee[1], &mh[1], element_size);
-  ee->element.data = &ee[1];
-  ee->remote = GNUNET_YES;
-
-  insert_element (eo, ee);
-  send_client_element (eo, &ee->element);
-
-  GNUNET_free (ee);
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Handle an element request from a remote peer.
+ * Handler for peer-disconnects, notifies the client about the aborted
+ * operation.  If we did not expect anything from the other peer, we
+ * gracefully terminate the operation.
  *
  *
- * @param cls the union operation
- * @param mh the message
+ * @param op the destroyed operation
  */
 static void
  */
 static void
-handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
+intersection_peer_disconnect (struct Operation *op)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
-  struct IBF_Key *ibf_key;
-  unsigned int num_keys;
-
-  /* look up elements and send them */
-  if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
-  {
-    GNUNET_break (0);
-    fail_union_operation (eo);
-    return;
-  }
-
-  num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
-
-  if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
+  if (PHASE_FINISHED != op->state->phase)
   {
   {
-    GNUNET_break (0);
-    fail_union_operation (eo);
+    fail_intersection_operation (op);
     return;
   }
     return;
   }
-
-  ibf_key = (struct IBF_Key *) &mh[1];
-  while (0 != num_keys--)
-  {
-    send_elements_for_key (eo, *ibf_key);
-    ibf_key++;
-  }
+  /* the session has already been concluded */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Other peer disconnected (finished)\n");
+  if (GNUNET_NO == op->state->client_done_sent)
+    finish_and_destroy (op);
 }
 
 
 /**
 }
 
 
 /**
- * Callback used for notifications
+ * Destroy the intersection operation.  Only things specific to the
+ * intersection operation are destroyed.
  *
  *
- * @param cls closure
- */
-static void
-peer_done_sent_cb (void *cls)
-{
-  struct UnionEvaluateOperation *eo = cls;
-
-  send_client_done_and_destroy (eo);
-}
-
-
-/**
- * Handle a done message from a remote peer
- * 
- * @param cls the union operation
- * @param mh the message
+ * @param op intersection operation to destroy
  */
 static void
  */
 static void
-handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
+intersection_op_cancel (struct Operation *op)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
-
-  if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  /* check if the op was canceled twice */
+  GNUNET_assert (NULL != op->state);
+  if (NULL != op->state->remote_bf)
   {
   {
-    /* we got all requests, but still have to send our elements as response */
-    struct GNUNET_MQ_Envelope *mqm;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
-    eo->phase = PHASE_FINISHED;
-    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-    GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
-    GNUNET_MQ_send (eo->tc->mq, mqm);
-    return;
+    GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
+    op->state->remote_bf = NULL;
   }
   }
-  if (eo->phase == PHASE_EXPECT_ELEMENTS)
+  if (NULL != op->state->local_bf)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
-    eo->phase = PHASE_FINISHED;
-    send_client_done_and_destroy (eo);
-    return;
+    GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
+    op->state->local_bf = NULL;
   }
   }
-  GNUNET_break (0);
-  fail_union_operation (eo);
-}
-
-
-/**
- * Evaluate a union operation with
- * a remote peer.
- *
- * @param m the evaluate request message from the client
- * @param set the set to evaluate the operation with
- */
-void
-_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
-{
-  struct IntersectionEvaluateOperation *eo;
-  struct GNUNET_MessageHeader *context_msg;
-
-  eo = GNUNET_new (struct IntersectionEvaluateOperation);
-  eo->peer = m->target_peer;
-  eo->set = set;
-  eo->request_id = htonl (m->request_id);
-  GNUNET_assert (0 != eo->request_id);
-  eo->se = strata_estimator_dup (set->state.i->se);
-  eo->salt = ntohs (m->salt);
-  eo->app_id = m->app_id;
-  
-  context_msg = GNUNET_MQ_extract_nested_mh (m);
-  if (NULL != context_msg)
+  if (NULL != op->state->my_elements)
   {
   {
-    eo->context_msg = GNUNET_copy_message (context_msg);
+    GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
+    op->state->my_elements = NULL;
   }
   }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
-             "evaluating intersection operation, (app %s)\n", 
-              GNUNET_h2s (&eo->app_id));
-
-  eo->tc = GNUNET_new (struct TunnelContext);
-  eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
-                                              GNUNET_APPLICATION_TYPE_SET);
-  GNUNET_assert (NULL != eo->tc->tunnel);
-  eo->tc->peer = eo->peer;
-  eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
-  /* we started the operation, thus we have to send the operation request */
-  eo->phase = PHASE_EXPECT_SE;
-
-  GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head,
-                               eo->set->state.i->ops_tail,
-                               eo);
-
-  send_operation_request (eo);
+  GNUNET_free (op->state);
+  op->state = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Destroying intersection op state done\n");
 }
 
 
 /**
 }
 
 
 /**
- * Accept an union operation request from a remote peer
+ * Create a new set supporting the intersection operation.
  *
  *
- * @param m the accept message from the client
- * @param set the set of the client
- * @param incoming information about the requesting remote peer
+ * @return the newly created set
  */
  */
-void
-_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
-                   struct Incoming *incoming)
+static struct SetState *
+intersection_set_create ()
 {
 {
-  struct UnionEvaluateOperation *eo;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
-
-  eo = GNUNET_new (struct UnionEvaluateOperation);
-  eo->tc = incoming->tc;
-  eo->generation_created = set->state.u->current_generation++;
-  eo->set = set;
-  eo->salt = ntohs (incoming->salt);
-  GNUNET_assert (0 != ntohl (m->request_id));
-  eo->request_id = ntohl (m->request_id);
-  eo->se = strata_estimator_dup (set->state.u->se);
-  /* transfer ownership of mq and socket from incoming to eo */
-  GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
-                               eo->set->state.u->ops_tail,
-                               eo);
-  /* kick off the operation */
-  send_strata_estimator (eo);
-}
+  struct SetState *set_state;
 
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Intersection set created\n");
+  set_state = GNUNET_new (struct SetState);
+  set_state->current_set_element_count = 0;
 
 
-/**
- * Create a new set supporting the intersection operation
- *
- * @return the newly created set
- */
-struct Set *
-_GSS_intersection_set_create (void)
-{
-  struct Set *set;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "intersection set created\n");
-  
-  set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct IntersectionState));
-  set->state.i = (struct IntersectionState *) &set[1];
-  set->operation = GNUNET_SET_OPERATION_INTERSECTION;
-  /* keys of the hash map are stored in the element entrys, thus we do not
-   * want the hash map to copy them */
-  set->state.i->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-  set->state.i->se = strata_estimator_create (SE_STRATA_COUNT,
-                                              SE_IBF_SIZE, SE_IBF_HASH_NUM);  
-  return set;
+  return set_state;
 }
 
 
 /**
  * Add the element from the given element message to the set.
  *
 }
 
 
 /**
  * Add the element from the given element message to the set.
  *
- * @param m message with the element
- * @param set set to add the element to
+ * @param set_state state of the set want to add to
+ * @param ee the element to add to the set
  */
  */
-void
-_GSS_intersection_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+intersection_add (struct SetState *set_state,
+                  struct ElementEntry *ee)
 {
 {
-  struct ElementEntry *ee;
-  struct ElementEntry *ee_dup;
-  uint16_t element_size;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
-
-  GNUNET_assert (GNUNET_SET_OPERATION_INTERSECTION == set->operation);
-  element_size = ntohs (m->header.size) - sizeof *m;
-  ee = GNUNET_malloc (element_size + sizeof *ee);
-  ee->element.size = element_size;
-  memcpy (&ee[1], &m[1], element_size);
-  ee->element.data = &ee[1];
-  ee->generation_added = set->state.i->current_generation;
-  GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
-  ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &ee->element_hash);
-  if (NULL != ee_dup)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
-    GNUNET_free (ee);
-    return;
-  }
-  GNUNET_CONTAINER_multihashmap_put (set->state.i->elements, &ee->element_hash, ee,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  strata_estimator_insert (set->state.i->se, get_ibf_key (&ee->element_hash, 0));
+  set_state->current_set_element_count++;
 }
 
 
 /**
 }
 
 
 /**
- * Destroy a set that supports the union operation
+ * Destroy a set that supports the intersection operation
  *
  *
- * @param the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
+ * @param set_state the set to destroy
  */
  */
-void
-_GSS_union_set_destroy (struct Set *set)
+static void
+intersection_set_destroy (struct SetState *set_state)
 {
 {
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  if (NULL != set->client)
-  {
-    GNUNET_SERVER_client_drop (set->client);
-    set->client = NULL;
-  }
-  if (NULL != set->client_mq)
-  {
-    GNUNET_MQ_destroy (set->client_mq);
-    set->client_mq = NULL;
-  }
-
-  if (NULL != set->state.u->se)
-  {
-    strata_estimator_destroy (set->state.u->se);
-    set->state.u->se = NULL;
-  }
-
-  destroy_elements (set->state.u);
-
-  while (NULL != set->state.u->ops_head)
-  {
-    _GSS_union_operation_destroy (set->state.u->ops_head);
-  }
+  GNUNET_free (set_state);
 }
 
 }
 
+
 /**
  * Remove the element given in the element message from the set.
 /**
  * Remove the element given in the element message from the set.
- * Only marks the element as removed, so that older set operations can still exchange it.
  *
  *
- * @param m message with the element
- * @param set set to remove the element from
+ * @param set_state state of the set to remove from
+ * @param element set element to remove
  */
  */
-void
-_GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+intersection_remove (struct SetState *set_state,
+                     struct ElementEntry *element)
 {
 {
-  struct GNUNET_HashCode hash;
-  struct ElementEntry *ee;
-
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
-  ee = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &hash);
-  if (NULL == ee)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
-    return;
-  }
-  if (GNUNET_YES == ee->removed)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
-    return;
-  }
-  ee->removed = GNUNET_YES;
-  ee->generation_removed = set->state.i->current_generation;
+  GNUNET_assert (0 < set_state->current_set_element_count);
+  set_state->current_set_element_count--;
 }
 
 
 /**
 }
 
 
 /**
- * Dispatch messages for a union operation.
+ * Get the table with implementing functions for set intersection.
  *
  *
- * @param cls closure
- * @param tunnel mesh tunnel
- * @param tunnel_ctx tunnel context
- * @param sender ???
- * @param mh message to process
- * @return ???
+ * @return the operation specific VTable
  */
  */
-int
-_GSS_union_handle_p2p_message (void *cls,
-                               struct GNUNET_MESH_Tunnel *tunnel,
-                               void **tunnel_ctx,
-                               const struct GNUNET_PeerIdentity *sender,
-                               const struct GNUNET_MessageHeader *mh)
+const struct SetVT *
+_GSS_intersection_vt ()
 {
 {
-  struct TunnelContext *tc = *tunnel_ctx;
-  struct UnionEvaluateOperation *eo;
-
-  if (CONTEXT_OPERATION_UNION != tc->type)
-  {
-    /* FIXME: kill the tunnel */
-    /* never kill mesh */
-    return GNUNET_OK;
-  }
-
-  eo = tc->data;
-
-  switch (ntohs (mh->type))
-  {
-    case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
-      handle_p2p_ibf (eo, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
-      handle_p2p_strata_estimator (eo, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
-      handle_p2p_elements (eo, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
-      handle_p2p_element_requests (eo, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
-      handle_p2p_done (eo, mh);
-      break;
-    default:
-      /* something wrong with mesh's message handlers? */
-      GNUNET_assert (0);
-  }
-  /* never kill mesh! */
-  return GNUNET_OK;
+  static const struct SetVT intersection_vt = {
+    .create = &intersection_set_create,
+    .msg_handler = &intersection_handle_p2p_message,
+    .add = &intersection_add,
+    .remove = &intersection_remove,
+    .destroy_set = &intersection_set_destroy,
+    .evaluate = &intersection_evaluate,
+    .accept = &intersection_accept,
+    .peer_disconnect = &intersection_peer_disconnect,
+    .cancel = &intersection_op_cancel,
+  };
+
+  return &intersection_vt;
 }
 }