clean up internal set API, avoid copying context message needlessly
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
index 5b1f28cf432b32258274802c902043e7af52fc96..5926b25f597f6c1515d43f79676f13a7c4780405 100644 (file)
@@ -4,7 +4,7 @@
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
+      by the Free Software Foundation; either version 3, or (at your
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
  * @brief two-peer set operations
  * @author Florian Dold
  */
  * @brief two-peer set operations
  * @author Florian Dold
  */
-
-
+#include "platform.h"
+#include "gnunet_util_lib.h"
 #include "gnunet-service-set.h"
 #include "gnunet-service-set.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_crypto_lib.h"
 #include "ibf.h"
 #include "strata_estimator.h"
 #include "set_protocol.h"
 #include "ibf.h"
 #include "strata_estimator.h"
 #include "set_protocol.h"
@@ -45,7 +43,7 @@
 /**
  * hash num parameter for the difference digests and strata estimators
  */
 /**
  * hash num parameter for the difference digests and strata estimators
  */
-#define SE_IBF_HASH_NUM 3
+#define SE_IBF_HASH_NUM 4
 
 /**
  * Number of buckets that can be transmitted in one message.
 
 /**
  * Number of buckets that can be transmitted in one message.
  */
 #define MAX_IBF_ORDER (16)
 
  */
 #define MAX_IBF_ORDER (16)
 
+/**
+ * Number of buckets used in the ibf per estimated
+ * difference.
+ */
+#define IBF_ALPHA 4
+
 
 /**
 
 /**
- * Current phase we are in for a union operation
+ * Current phase we are in for a union operation.
  */
 enum UnionOperationPhase
 {
  */
 enum UnionOperationPhase
 {
@@ -69,25 +73,43 @@ enum UnionOperationPhase
    * We sent the request message, and expect a strata estimator
    */
   PHASE_EXPECT_SE,
    * We sent the request message, and expect a strata estimator
    */
   PHASE_EXPECT_SE,
+
   /**
   /**
-   * We sent the strata estimator, and expect an IBF
+   * We sent the strata estimator, and expect an IBF. This phase is entered once
+   * upon initialization and later via PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
+   *
+   * After receiving the complete IBF, we enter PHASE_EXPECT_ELEMENTS
    */
   PHASE_EXPECT_IBF,
    */
   PHASE_EXPECT_IBF,
+
   /**
   /**
-   * We know what type of IBF the other peer wants to send us,
-   * and expect the remaining parts
+   * Continuation for multi part IBFs.
    */
   PHASE_EXPECT_IBF_CONT,
    */
   PHASE_EXPECT_IBF_CONT,
+
   /**
    * We are sending request and elements,
    * and thus only expect elements from the other peer.
   /**
    * We are sending request and elements,
    * and thus only expect elements from the other peer.
+   *
+   * We are currently decoding an IBF until it can no longer be decoded,
+   * we currently send requests and expect elements
+   * The remote peer is in PHASE_EXPECT_ELEMENTS_AND_REQUESTS
    */
   PHASE_EXPECT_ELEMENTS,
    */
   PHASE_EXPECT_ELEMENTS,
+
   /**
    * We are expecting elements and requests, and send
    * requested elements back to the other peer.
   /**
    * We are expecting elements and requests, and send
    * requested elements back to the other peer.
+   *
+   * We are in this phase if we have SENT an IBF for the remote peer to decode.
+   * We expect requests, send elements or could receive an new IBF, which takes
+   * us via PHASE_EXPECT_IBF to phase PHASE_EXPECT_ELEMENTS
+   *
+   * The remote peer is thus in:
+   * PHASE_EXPECT_ELEMENTS
    */
   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
    */
   PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
+
   /**
    * The protocol is over.
    * Results may still have to be sent to the client.
   /**
    * The protocol is over.
    * Results may still have to be sent to the client.
@@ -97,48 +119,10 @@ enum UnionOperationPhase
 
 
 /**
 
 
 /**
- * State of an evaluate operation
- * with another peer.
+ * State of an evaluate operation with another peer.
  */
  */
-struct UnionEvaluateOperation
+struct OperationState
 {
 {
-  /**
-   * Local set the operation is evaluated on.
-   */
-  struct Set *set;
-
-  /**
-   * Peer with the remote set
-   */
-  struct GNUNET_PeerIdentity peer;
-
-  /**
-   * Application-specific identifier
-   */
-  struct GNUNET_HashCode app_id;
-
-  /**
-   * Context message, given to us
-   * by the client, may be NULL.
-   */
-  struct GNUNET_MessageHeader *context_msg;
-
-  /**
-   * Tunnel context for the peer we
-   * evaluate the union operation with.
-   */
-  struct TunnelContext *tc;
-
-  /**
-   * Request ID to multiplex set operations to
-   * the client inhabiting the set.
-   */
-  uint32_t request_id;
-
-  /**
-   * Number of ibf buckets received
-   */
-  unsigned int ibf_buckets_received;
 
   /**
    * Copy of the set's strata estimator at the time of
 
   /**
    * Copy of the set's strata estimator at the time of
@@ -158,89 +142,37 @@ struct UnionEvaluateOperation
 
   /**
    * Maps IBF-Keys (specific to the current salt) to elements.
 
   /**
    * Maps IBF-Keys (specific to the current salt) to elements.
+   * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
+   * Colliding IBF-Keys are linked.
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
 
   /**
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
 
   /**
-   * Current state of the operation.
-   */
-  enum UnionOperationPhase phase;
-
-  /**
-   * Salt to use for this operation.
+   * Iterator for sending elements on the key to element mapping to the client.
    */
    */
-  uint16_t salt;
+  struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
 
   /**
 
   /**
-   * Generation in which the operation handle
-   * was created.
-   */
-  unsigned int generation_created;
-  
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *next;
-  
-   /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *prev;
-};
-
-
-/**
- * Information about the element in a set.
- * All elements are stored in a hash-table
- * from their hash-code to their 'struct Element',
- * so that the remove and add operations are reasonably
- * fast.
- */
-struct ElementEntry
-{
-  /**
-   * The actual element. The data for the element
-   * should be allocated at the end of this struct.
-   */
-  struct GNUNET_SET_Element element;
-
-  /**
-   * Hash of the element.
-   * Will be used to derive the different IBF keys
-   * for different salts.
-   */
-  struct GNUNET_HashCode element_hash;
-
-  /**
-   * Generation the element was added by the client.
-   * Operations of earlier generations will not consider the element.
+   * Current state of the operation.
    */
    */
-  unsigned int generation_added;
+  enum UnionOperationPhase phase;
 
   /**
 
   /**
-   * GNUNET_YES if the element has been removed in some generation.
+   * Did we send the client that we are done?
    */
    */
-  int removed;
+  int client_done_sent;
 
   /**
 
   /**
-   * Generation the element was removed by the client. 
-   * Operations of later generations will not consider the element.
-   * Only valid if is_removed is GNUNET_YES.
+   * Number of ibf buckets received
    */
    */
-  unsigned int generation_removed;
+  unsigned int ibf_buckets_received;
 
 
-  /**
-   * GNUNET_YES if the element is a remote element, and does not belong
-   * to the operation's set.
-   */
-  int remote;
 };
 
 
 /**
 };
 
 
 /**
- * Entries in the key-to-element map of the union set.
+ * The key entry is used to associate an ibf key with
+ * an element.
  */
 struct KeyEntry
 {
  */
 struct KeyEntry
 {
@@ -250,17 +182,18 @@ struct KeyEntry
   struct IBF_Key ibf_key;
 
   /**
   struct IBF_Key ibf_key;
 
   /**
-   * The actual element associated with the key
+   * The actual element associated with the key.
    */
   struct ElementEntry *element;
 
   /**
    * Element that collides with this element
    */
   struct ElementEntry *element;
 
   /**
    * Element that collides with this element
-   * on the ibf key
+   * on the ibf key. All colliding entries must have the same ibf key.
    */
   struct KeyEntry *next_colliding;
 };
 
    */
   struct KeyEntry *next_colliding;
 };
 
+
 /**
  * Used as a closure for sending elements
  * with a specific IBF key.
 /**
  * Used as a closure for sending elements
  * with a specific IBF key.
@@ -277,14 +210,14 @@ struct SendElementClosure
    * Operation for which the elements
    * should be sent.
    */
    * Operation for which the elements
    * should be sent.
    */
-  struct UnionEvaluateOperation *eo;
+  struct Operation *op;
 };
 
 
 /**
  * Extra state required for efficient set union.
  */
 };
 
 
 /**
  * Extra state required for efficient set union.
  */
-struct UnionState
+struct SetState
 {
   /**
    * The strata estimator is only generated once for
 {
   /**
    * The strata estimator is only generated once for
@@ -293,81 +226,18 @@ struct UnionState
    * salt=0.
    */
   struct StrataEstimator *se;
    * salt=0.
    */
   struct StrataEstimator *se;
-
-  /**
-   * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *elements;
-
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *ops_head;
-
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *ops_tail;
-
-  /**
-   * Current generation, that is, number of
-   * previously executed operations on this set
-   */
-  unsigned int current_generation;
 };
 
 
 };
 
 
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-destroy_elements_iterator (void *cls,
-                           const struct GNUNET_HashCode * key,
-                           void *value)
-{
-  struct ElementEntry *ee = value;
-
-  GNUNET_free (ee);
-  return GNUNET_YES;
-}
-
-
-/**
- * Destroy the elements belonging to a union set.
- *
- * @param us union state that contains the elements
- */
-static void
-destroy_elements (struct UnionState *us)
-{
-  if (NULL == us->elements)
-    return;
-  GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (us->elements);
-  us->elements = NULL;
-}
-
-
-
 /**
  * Iterator over hash map entries.
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
 /**
  * Iterator over hash map entries.
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
+ * @return #GNUNET_YES if we should continue to
  *         iterate,
  *         iterate,
- *         GNUNET_NO if not.
+ *         #GNUNET_NO if not.
  */
 static int
 destroy_key_to_element_iter (void *cls,
  */
 static int
 destroy_key_to_element_iter (void *cls,
@@ -375,11 +245,16 @@ destroy_key_to_element_iter (void *cls,
                              void *value)
 {
   struct KeyEntry *k = value;
                              void *value)
 {
   struct KeyEntry *k = value;
-  
+  /* destroy the linked list of colliding ibf key entries */
   while (NULL != k)
   {
     struct KeyEntry *k_tmp = k;
     k = k->next_colliding;
   while (NULL != k)
   {
     struct KeyEntry *k_tmp = k;
     k = k->next_colliding;
+    if (GNUNET_YES == k_tmp->element->remote)
+    {
+      GNUNET_free (k_tmp->element);
+      k_tmp->element = NULL;
+    }
     GNUNET_free (k_tmp);
   }
   return GNUNET_YES;
     GNUNET_free (k_tmp);
   }
   return GNUNET_YES;
@@ -387,56 +262,40 @@ destroy_key_to_element_iter (void *cls,
 
 
 /**
 
 
 /**
- * Destroy a union operation, and free all resources
- * associated with it.
+ * Destroy the union operation.  Only things specific to the union operation are destroyed.
  *
  *
- * @param eo the union operation to destroy
+ * @param op union operation to destroy
  */
  */
-void
-_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
+static void
+union_op_cancel (struct Operation *op)
 {
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
-  
-  if (NULL != eo->tc)
-  {
-    GNUNET_MQ_destroy (eo->tc->mq);
-    GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
-    GNUNET_free (eo->tc);
-    eo->tc = NULL;
-  }
-
-  if (NULL != eo->remote_ibf)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
+  /* check if the op was canceled twice */
+  GNUNET_assert (NULL != op->state);
+  if (NULL != op->state->remote_ibf)
   {
   {
-    ibf_destroy (eo->remote_ibf);
-    eo->remote_ibf = NULL;
+    ibf_destroy (op->state->remote_ibf);
+    op->state->remote_ibf = NULL;
   }
   }
-  if (NULL != eo->local_ibf)
+  if (NULL != op->state->local_ibf)
   {
   {
-    ibf_destroy (eo->local_ibf);
-    eo->local_ibf = NULL;
+    ibf_destroy (op->state->local_ibf);
+    op->state->local_ibf = NULL;
   }
   }
-  if (NULL != eo->se)
+  if (NULL != op->state->se)
   {
   {
-    strata_estimator_destroy (eo->se);
-    eo->se = NULL;
+    strata_estimator_destroy (op->state->se);
+    op->state->se = NULL;
   }
   }
-  if (NULL != eo->key_to_element)
+  if (NULL != op->state->key_to_element)
   {
   {
-    GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
-    GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
-    eo->key_to_element = NULL;
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
+    GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
+    op->state->key_to_element = NULL;
   }
   }
-
-  GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
-                               eo->set->state.u->ops_tail,
-                               eo);
-  GNUNET_free (eo);
-
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
-
-
-  /* FIXME: do a garbage collection of the set generations */
+  GNUNET_free (op->state);
+  op->state = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
 }
 
 
 }
 
 
@@ -444,24 +303,27 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
  * Inform the client that the union operation has failed,
  * and proceed to destroy the evaluate operation.
  *
  * Inform the client that the union operation has failed,
  * and proceed to destroy the evaluate operation.
  *
- * @param eo the union operation to fail
+ * @param op the union operation to fail
  */
 static void
  */
 static void
-fail_union_operation (struct UnionEvaluateOperation *eo)
+fail_union_operation (struct Operation *op)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *msg;
 
   struct GNUNET_SET_ResultMessage *msg;
 
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
+
+  ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
-  msg->request_id = htonl (eo->request_id);
-  GNUNET_MQ_send (eo->set->client_mq, mqm);
-  _GSS_union_operation_destroy (eo);
+  msg->request_id = htonl (op->spec->client_request_id);
+  msg->element_type = htons (0);
+  GNUNET_MQ_send (op->spec->set->client_mq, ev);
+  _GSS_operation_destroy (op, GNUNET_YES);
 }
 
 
 /**
 }
 
 
 /**
- * Derive the IBF key from a hash code and 
+ * Derive the IBF key from a hash code and
  * a salt.
  *
  * @param src the hash code
  * a salt.
  *
  * @param src the hash code
@@ -469,7 +331,7 @@ fail_union_operation (struct UnionEvaluateOperation *eo)
  * @return the derived IBF key
  */
 static struct IBF_Key
  * @return the derived IBF key
  */
 static struct IBF_Key
-get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
+get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
 {
   struct IBF_Key key;
 
 {
   struct IBF_Key key;
 
@@ -483,37 +345,35 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
 
 
 /**
 
 
 /**
- * Send a request for the evaluate operation to a remote peer
+ * Iterator to create the mapping between ibf keys
+ * and element entries.
  *
  *
- * @param eo operation with the other peer
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to
+ *         iterate,
+ *         #GNUNET_NO if not.
  */
  */
-static void
-send_operation_request (struct UnionEvaluateOperation *eo)
+static int
+op_register_element_iterator (void *cls,
+                              uint32_t key,
+                              void *value)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct OperationRequestMessage *msg;
-
-  mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                                 eo->context_msg);
-
-  if (NULL == mqm)
-  {
-    /* the context message is too large */
-    GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (eo->set->client);
-    return;
-  }
-  msg->operation = htons (GNUNET_SET_OPERATION_UNION);
-  msg->app_id = eo->app_id;
-  GNUNET_MQ_send (eo->tc->mq, mqm);
+  struct KeyEntry *const new_k = cls;
+  struct KeyEntry *old_k = value;
 
 
-  if (NULL != eo->context_msg)
+  GNUNET_assert (NULL != old_k);
+  /* check if our ibf key collides with the ibf key in the existing entry */
+  if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
   {
   {
-    GNUNET_free (eo->context_msg);
-    eo->context_msg = NULL;
+    /* insert the the new key in the collision chain */
+    new_k->next_colliding = old_k->next_colliding;
+    old_k->next_colliding = new_k;
+    /* signal to the caller that we were able to insert into a colliding bucket */
+    return GNUNET_NO;
   }
   }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
+  return GNUNET_YES;
 }
 
 
 }
 
 
@@ -524,61 +384,88 @@ send_operation_request (struct UnionEvaluateOperation *eo)
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
+ * @return #GNUNET_YES if we should continue to
  *         iterate,
  *         iterate,
- *         GNUNET_NO if not.
+ *         #GNUNET_NO if not.
  */
 static int
  */
 static int
-insert_element_iterator (void *cls,
+op_has_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
 {
                          uint32_t key,
                          void *value)
 {
-  struct KeyEntry *const new_k = cls;
-  struct KeyEntry *old_k = value;
+  struct GNUNET_HashCode *element_hash = cls;
+  struct KeyEntry *k = value;
 
 
-  GNUNET_assert (NULL != old_k);
-  do
+  GNUNET_assert (NULL != k);
+  while (NULL != k)
   {
   {
-    if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
-    {
-      new_k->next_colliding = old_k->next_colliding;
-      old_k->next_colliding = new_k;
+    if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
       return GNUNET_NO;
       return GNUNET_NO;
-    }
-    old_k = old_k->next_colliding;
-  } while (NULL != old_k);
+    k = k->next_colliding;
+  }
   return GNUNET_YES;
 }
 
 
   return GNUNET_YES;
 }
 
 
+/**
+ * Determine whether the given element is already in the operation's element
+ * set.
+ *
+ * @param op operation that should be tested for 'element_hash'
+ * @param element_hash hash of the element to look for
+ * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
+ */
+static int
+op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
+{
+  int ret;
+  struct IBF_Key ibf_key;
+
+  ibf_key = get_ibf_key (element_hash, op->spec->salt);
+  ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+                                                      (uint32_t) ibf_key.key_val,
+                                                      op_has_element_iterator, (void *) element_hash);
+
+  /* was the iteration aborted because we found the element? */
+  if (GNUNET_SYSERR == ret)
+    return GNUNET_YES;
+  return GNUNET_NO;
+}
+
+
 /**
  * Insert an element into the union operation's
 /**
  * Insert an element into the union operation's
- * key-to-element mapping
+ * key-to-element mapping. Takes ownership of 'ee'.
+ * Note that this does not insert the element in the set,
+ * only in the operation's key-element mapping.
+ * This is done to speed up re-tried operations, if some elements
+ * were transmitted, and then the IBF fails to decode.
  *
  *
- * @param eo the union operation
+ * @param op the union operation
  * @param ee the element entry
  */
 static void
  * @param ee the element entry
  */
 static void
-insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
+op_register_element (struct Operation *op,
+                     struct ElementEntry *ee)
 {
   int ret;
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
 
 {
   int ret;
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
 
-  ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
+  ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
-  ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
+  ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
                                                       (uint32_t) ibf_key.key_val,
                                                       (uint32_t) ibf_key.key_val,
-                                                      insert_element_iterator, k);
+                                                      op_register_element_iterator, k);
 
   /* was the element inserted into a colliding bucket? */
   if (GNUNET_SYSERR == ret)
     return;
 
 
   /* was the element inserted into a colliding bucket? */
   if (GNUNET_SYSERR == ret)
     return;
 
-  GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
+                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 }
 
 
 }
 
 
@@ -597,6 +484,8 @@ prepare_ibf_iterator (void *cls,
   struct InvertibleBloomFilter *ibf = cls;
   struct KeyEntry *ke = value;
 
   struct InvertibleBloomFilter *ibf = cls;
   struct KeyEntry *ke = value;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
+
   ibf_insert (ibf, ke->ibf_key);
   return GNUNET_YES;
 }
   ibf_insert (ibf, ke->ibf_key);
   return GNUNET_YES;
 }
@@ -610,23 +499,27 @@ prepare_ibf_iterator (void *cls,
  * @param key unised
  * @param value the element entry to insert
  *        into the key-to-element mapping
  * @param key unised
  * @param value the element entry to insert
  *        into the key-to-element mapping
+ * @return GNUNET_YES to continue iterating,
+ *         GNUNET_NO to stop
  */
 static int
 init_key_to_element_iterator (void *cls,
                               const struct GNUNET_HashCode *key,
                               void *value)
 {
  */
 static int
 init_key_to_element_iterator (void *cls,
                               const struct GNUNET_HashCode *key,
                               void *value)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct Operation *op = cls;
   struct ElementEntry *e = value;
 
   /* make sure that the element belongs to the set at the time
    * of creating the operation */
   struct ElementEntry *e = value;
 
   /* make sure that the element belongs to the set at the time
    * of creating the operation */
-  if ( (e->generation_added > eo->generation_created) ||
+  if ( (e->generation_added > op->generation_created) ||
        ( (GNUNET_YES == e->removed) &&
        ( (GNUNET_YES == e->removed) &&
-         (e->generation_removed < eo->generation_created)))
+         (e->generation_removed < op->generation_created)))
     return GNUNET_YES;
 
     return GNUNET_YES;
 
-  insert_element (eo, e);
+  GNUNET_assert (GNUNET_NO == e->remote);
+
+  op_register_element (op, e);
   return GNUNET_YES;
 }
 
   return GNUNET_YES;
 }
 
@@ -635,50 +528,50 @@ init_key_to_element_iterator (void *cls,
  * Create an ibf with the operation's elements
  * of the specified size
  *
  * Create an ibf with the operation's elements
  * of the specified size
  *
- * @param eo the union operation
+ * @param op the union operation
  * @param size size of the ibf to create
  */
 static void
  * @param size size of the ibf to create
  */
 static void
-prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
+prepare_ibf (struct Operation *op, uint16_t size)
 {
 {
-  if (NULL == eo->key_to_element)
+  if (NULL == op->state->key_to_element)
   {
     unsigned int len;
   {
     unsigned int len;
-    len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
-    eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
-    GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
-                                             init_key_to_element_iterator, eo);
+    len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
+    op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
+                                           init_key_to_element_iterator, op);
   }
   }
-  if (NULL != eo->local_ibf)
-    ibf_destroy (eo->local_ibf);
-  eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
-  GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
-                                           prepare_ibf_iterator, eo->local_ibf);
+  if (NULL != op->state->local_ibf)
+    ibf_destroy (op->state->local_ibf);
+  op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+  GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                           prepare_ibf_iterator, op->state->local_ibf);
 }
 
 
 /**
  * Send an ibf of appropriate size.
  *
 }
 
 
 /**
  * Send an ibf of appropriate size.
  *
- * @param eo the union operation
+ * @param op the union operation
  * @param ibf_order order of the ibf to send, size=2^order
  */
 static void
  * @param ibf_order order of the ibf to send, size=2^order
  */
 static void
-send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
+send_ibf (struct Operation *op, uint16_t ibf_order)
 {
   unsigned int buckets_sent = 0;
   struct InvertibleBloomFilter *ibf;
 
 {
   unsigned int buckets_sent = 0;
   struct InvertibleBloomFilter *ibf;
 
-  prepare_ibf (eo, 1<<ibf_order);
+  prepare_ibf (op, 1<<ibf_order);
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
 
 
-  ibf = eo->local_ibf;
+  ibf = op->state->local_ibf;
 
   while (buckets_sent < (1 << ibf_order))
   {
     unsigned int buckets_in_message;
 
   while (buckets_sent < (1 << ibf_order))
   {
     unsigned int buckets_in_message;
-    struct GNUNET_MQ_Envelope *mqm;
+    struct GNUNET_MQ_Envelope *ev;
     struct IBFMessage *msg;
 
     buckets_in_message = (1 << ibf_order) - buckets_sent;
     struct IBFMessage *msg;
 
     buckets_in_message = (1 << ibf_order) - buckets_sent;
@@ -686,37 +579,41 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
 
     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
 
-    mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
-                               GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
+    ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
+                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
+    msg->reserved = 0;
     msg->order = ibf_order;
     msg->offset = htons (buckets_sent);
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
     msg->order = ibf_order;
     msg->offset = htons (buckets_sent);
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
-    GNUNET_MQ_send (eo->tc->mq, mqm);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
+                buckets_in_message, buckets_sent, 1<<ibf_order);
+    GNUNET_MQ_send (op->mq, ev);
   }
 
   }
 
-  eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
+  op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
 }
 
 
 /**
  * Send a strata estimator to the remote peer.
  *
 }
 
 
 /**
  * Send a strata estimator to the remote peer.
  *
- * @param eo the union operation with the remote peer
+ * @param op the union operation with the remote peer
  */
 static void
  */
 static void
-send_strata_estimator (struct UnionEvaluateOperation *eo)
+send_strata_estimator (struct Operation *op)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_MessageHeader *strata_msg;
 
   struct GNUNET_MessageHeader *strata_msg;
 
-  mqm = GNUNET_MQ_msg_header_extra (strata_msg,
-                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
-                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
-  strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
-  GNUNET_MQ_send (eo->tc->mq, mqm);
-  eo->phase = PHASE_EXPECT_IBF;
+  ev = GNUNET_MQ_msg_header_extra (strata_msg,
+                                   SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
+                                   GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
+  strata_estimator_write (op->state->se, &strata_msg[1]);
+  GNUNET_MQ_send (op->mq, ev);
+  op->state->phase = PHASE_EXPECT_IBF;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
 }
 
 
 }
 
 
@@ -733,7 +630,7 @@ get_order_from_difference (unsigned int diff)
   unsigned int ibf_order;
 
   ibf_order = 2;
   unsigned int ibf_order;
 
   ibf_order = 2;
-  while ((1<<ibf_order) < (2 * diff))
+  while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
     ibf_order++;
   if (ibf_order > MAX_IBF_ORDER)
     ibf_order = MAX_IBF_ORDER;
     ibf_order++;
   if (ibf_order > MAX_IBF_ORDER)
     ibf_order = MAX_IBF_ORDER;
@@ -750,27 +647,27 @@ get_order_from_difference (unsigned int diff)
 static void
 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
 {
 static void
 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct Operation *op = cls;
   struct StrataEstimator *remote_se;
   int diff;
 
   struct StrataEstimator *remote_se;
   int diff;
 
-
-  if (eo->phase != PHASE_EXPECT_SE)
+  if (op->state->phase != PHASE_EXPECT_SE)
   {
   {
-    fail_union_operation (eo);
+    fail_union_operation (op);
     GNUNET_break (0);
     return;
   }
   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
                                        SE_IBF_HASH_NUM);
   strata_estimator_read (&mh[1], remote_se);
     GNUNET_break (0);
     return;
   }
   remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
                                        SE_IBF_HASH_NUM);
   strata_estimator_read (&mh[1], remote_se);
-  GNUNET_assert (NULL != eo->se);
-  diff = strata_estimator_difference (remote_se, eo->se);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
+  GNUNET_assert (NULL != op->state->se);
+  diff = strata_estimator_difference (remote_se, op->state->se);
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (remote_se);
-  strata_estimator_destroy (eo->se);
-  eo->se = NULL;
-  send_ibf (eo, get_order_from_difference (diff));
+  strata_estimator_destroy (op->state->se);
+  op->state->se = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
+              diff, 1<<get_order_from_difference (diff));
+  send_ibf (op, get_order_from_difference (diff));
 }
 
 
 }
 
 
@@ -789,7 +686,7 @@ send_element_iterator (void *cls,
 {
   struct SendElementClosure *sec = cls;
   struct IBF_Key ibf_key = sec->ibf_key;
 {
   struct SendElementClosure *sec = cls;
   struct IBF_Key ibf_key = sec->ibf_key;
-  struct UnionEvaluateOperation *eo = sec->eo;
+  struct Operation *op = sec->op;
   struct KeyEntry *ke = value;
 
   if (ke->ibf_key.key_val != ibf_key.key_val)
   struct KeyEntry *ke = value;
 
   if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -797,20 +694,21 @@ send_element_iterator (void *cls,
   while (NULL != ke)
   {
     const struct GNUNET_SET_Element *const element = &ke->element->element;
   while (NULL != ke)
   {
     const struct GNUNET_SET_Element *const element = &ke->element->element;
-    struct GNUNET_MQ_Envelope *mqm;
+    struct GNUNET_MQ_Envelope *ev;
     struct GNUNET_MessageHeader *mh;
 
     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
     struct GNUNET_MessageHeader *mh;
 
     GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
-    mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
-    if (NULL == mqm)
+    ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+    if (NULL == ev)
     {
       /* element too large */
       GNUNET_break (0);
       continue;
     }
     memcpy (&mh[1], element->data, element->size);
     {
       /* element too large */
       GNUNET_break (0);
       continue;
     }
     memcpy (&mh[1], element->data, element->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
-    GNUNET_MQ_send (eo->tc->mq, mqm);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
+                GNUNET_h2s (&ke->element->element_hash));
+    GNUNET_MQ_send (op->mq, ev);
     ke = ke->next_colliding;
   }
   return GNUNET_NO;
     ke = ke->next_colliding;
   }
   return GNUNET_NO;
@@ -820,18 +718,19 @@ send_element_iterator (void *cls,
  * Send all elements that have the specified IBF key
  * to the remote peer of the union operation
  *
  * Send all elements that have the specified IBF key
  * to the remote peer of the union operation
  *
- * @param eo union operation
+ * @param op union operation
  * @param ibf_key IBF key of interest
  */
 static void
  * @param ibf_key IBF key of interest
  */
 static void
-send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key)
+send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
 {
   struct SendElementClosure send_cls;
 
   send_cls.ibf_key = ibf_key;
 {
   struct SendElementClosure send_cls;
 
   send_cls.ibf_key = ibf_key;
-  send_cls.eo = eo;
-  GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
-                                                &send_element_iterator, &send_cls);
+  send_cls.op = op;
+  (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+                                                       (uint32_t) ibf_key.key_val,
+                                                       &send_element_iterator, &send_cls);
 }
 
 
 }
 
 
@@ -839,27 +738,52 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key
  * Decode which elements are missing on each side, and
  * send the appropriate elemens and requests
  *
  * Decode which elements are missing on each side, and
  * send the appropriate elemens and requests
  *
- * @param eo union operation
+ * @param op union operation
  */
 static void
  */
 static void
-decode_and_send (struct UnionEvaluateOperation *eo)
+decode_and_send (struct Operation *op)
 {
   struct IBF_Key key;
 {
   struct IBF_Key key;
+  struct IBF_Key last_key;
   int side;
   int side;
+  unsigned int num_decoded;
   struct InvertibleBloomFilter *diff_ibf;
 
   struct InvertibleBloomFilter *diff_ibf;
 
-  GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
+  GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
+
+  prepare_ibf (op, op->state->remote_ibf->size);
+  diff_ibf = ibf_dup (op->state->local_ibf);
+  ibf_subtract (diff_ibf, op->state->remote_ibf);
+
+  ibf_destroy (op->state->remote_ibf);
+  op->state->remote_ibf = NULL;
 
 
-  prepare_ibf (eo, eo->remote_ibf->size);
-  diff_ibf = ibf_dup (eo->local_ibf);
-  ibf_subtract (diff_ibf, eo->remote_ibf);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
+
+  num_decoded = 0;
+  last_key.key_val = 0;
 
   while (1)
   {
     int res;
 
   while (1)
   {
     int res;
+    int cycle_detected = GNUNET_NO;
+
+    last_key = key;
 
     res = ibf_decode (diff_ibf, &side, &key);
 
     res = ibf_decode (diff_ibf, &side, &key);
-    if (GNUNET_SYSERR == res)
+    if (res == GNUNET_OK)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
+                  key.key_val);
+      num_decoded += 1;
+      if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
+                    num_decoded, diff_ibf->size);
+        cycle_detected = GNUNET_YES;
+      }
+    }
+    if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
     {
       int next_order;
       next_order = 0;
     {
       int next_order;
       next_order = 0;
@@ -868,42 +792,48 @@ decode_and_send (struct UnionEvaluateOperation *eo)
       next_order++;
       if (next_order <= MAX_IBF_ORDER)
       {
       next_order++;
       if (next_order <= MAX_IBF_ORDER)
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                    "decoding failed, sending larger ibf (size %u)\n",
                     1<<next_order);
                    "decoding failed, sending larger ibf (size %u)\n",
                     1<<next_order);
-        send_ibf (eo, next_order);
+        send_ibf (op, next_order);
       }
       else
       {
       }
       else
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
+        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                    "set union failed: reached ibf limit\n");
       }
       break;
     }
     if (GNUNET_NO == res)
     {
                    "set union failed: reached ibf limit\n");
       }
       break;
     }
     if (GNUNET_NO == res)
     {
-      struct GNUNET_MQ_Envelope *mqm;
+      struct GNUNET_MQ_Envelope *ev;
 
 
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n");
-      mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-      GNUNET_MQ_send (eo->tc->mq, mqm);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
+      GNUNET_MQ_send (op->mq, ev);
       break;
     }
     if (1 == side)
     {
       break;
     }
     if (1 == side)
     {
-      send_elements_for_key (eo, key);
+      send_elements_for_key (op, key);
     }
     }
-    else
+    else if (-1 == side)
     {
     {
-      struct GNUNET_MQ_Envelope *mqm;
+      struct GNUNET_MQ_Envelope *ev;
       struct GNUNET_MessageHeader *msg;
 
       struct GNUNET_MessageHeader *msg;
 
-      /* FIXME: before sending the request, check if we may just have the element */
-      /* FIXME: merge multiple requests */
-      mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
+      /* It may be nice to merge multiple requests, but with cadet's corking it is not worth
+       * the effort additional complexity. */
+      ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
                                         GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
+
       *(struct IBF_Key *) &msg[1] = key;
       *(struct IBF_Key *) &msg[1] = key;
-      GNUNET_MQ_send (eo->tc->mq, mqm);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
+      GNUNET_MQ_send (op->mq, ev);
+    }
+    else
+    {
+      GNUNET_assert (0);
     }
   }
   ibf_destroy (diff_ibf);
     }
   }
   ibf_destroy (diff_ibf);
@@ -919,52 +849,60 @@ decode_and_send (struct UnionEvaluateOperation *eo)
 static void
 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
 {
 static void
 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct Operation *op = cls;
   struct IBFMessage *msg = (struct IBFMessage *) mh;
   unsigned int buckets_in_message;
 
   struct IBFMessage *msg = (struct IBFMessage *) mh;
   unsigned int buckets_in_message;
 
-  if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
-       (eo->phase == PHASE_EXPECT_IBF) )
+  if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
+       (op->state->phase == PHASE_EXPECT_IBF) )
   {
   {
-    eo->phase = PHASE_EXPECT_IBF_CONT;
-    GNUNET_assert (NULL == eo->remote_ibf);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order);
-    eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    op->state->phase = PHASE_EXPECT_IBF_CONT;
+    GNUNET_assert (NULL == op->state->remote_ibf);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
+    op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    op->state->ibf_buckets_received = 0;
     if (0 != ntohs (msg->offset))
     {
       GNUNET_break (0);
     if (0 != ntohs (msg->offset))
     {
       GNUNET_break (0);
-      fail_union_operation (eo);
+      fail_union_operation (op);
+      return;
     }
   }
     }
   }
-  else if (eo->phase == PHASE_EXPECT_IBF_CONT)
+  else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
   {
   {
-    if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
-         (1<<msg->order != eo->remote_ibf->size) )
+    if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
+         (1<<msg->order != op->state->remote_ibf->size) )
     {
       GNUNET_break (0);
     {
       GNUNET_break (0);
-      fail_union_operation (eo);
+      fail_union_operation (op);
       return;
     }
   }
 
   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
 
       return;
     }
   }
 
   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
 
+  if (0 == buckets_in_message)
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
   {
     GNUNET_break (0);
   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
   {
     GNUNET_break (0);
-    fail_union_operation (eo);
+    fail_union_operation (op);
     return;
   }
     return;
   }
-  
-  ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
-  eo->ibf_buckets_received += buckets_in_message;
 
 
-  if (eo->ibf_buckets_received == eo->remote_ibf->size)
-  {
+  ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
+  op->state->ibf_buckets_received += buckets_in_message;
 
 
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
-    eo->phase = PHASE_EXPECT_ELEMENTS;
-    decode_and_send (eo);
+  if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
+    op->state->phase = PHASE_EXPECT_ELEMENTS;
+    decode_and_send (op);
   }
 }
 
   }
 }
 
@@ -973,28 +911,109 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
  * Send a result message to the client indicating
  * that there is a new element.
  *
  * Send a result message to the client indicating
  * that there is a new element.
  *
- * @param eo union operation
+ * @param op union operation
  * @param element element to send
  */
 static void
  * @param element element to send
  */
 static void
-send_client_element (struct UnionEvaluateOperation *eo,
+send_client_element (struct Operation *op,
                      struct GNUNET_SET_Element *element)
 {
                      struct GNUNET_SET_Element *element)
 {
-  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
   struct GNUNET_SET_ResultMessage *rm;
 
-  GNUNET_assert (0 != eo->request_id);
-  mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
-  if (NULL == mqm)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
+  GNUNET_assert (0 != op->spec->client_request_id);
+  ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
+  if (NULL == ev)
   {
   {
-    GNUNET_MQ_discard (mqm);
+    GNUNET_MQ_discard (ev);
     GNUNET_break (0);
     return;
   }
   rm->result_status = htons (GNUNET_SET_STATUS_OK);
     GNUNET_break (0);
     return;
   }
   rm->result_status = htons (GNUNET_SET_STATUS_OK);
-  rm->request_id = htonl (eo->request_id);
+  rm->request_id = htonl (op->spec->client_request_id);
+  rm->element_type = element->element_type;
   memcpy (&rm[1], element->data, element->size);
   memcpy (&rm[1], element->data, element->size);
-  GNUNET_MQ_send (eo->set->client_mq, mqm);
+  GNUNET_MQ_send (op->spec->set->client_mq, ev);
+}
+
+
+/**
+ * Signal to the client that the operation has finished and
+ * destroy the operation.
+ *
+ * @param cls operation to destroy
+ */
+static void
+send_done_and_destroy (void *cls)
+{
+  struct Operation *op = cls;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ResultMessage *rm;
+  int keep = op->keep;
+
+  ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
+  rm->request_id = htonl (op->spec->client_request_id);
+  rm->result_status = htons (GNUNET_SET_STATUS_DONE);
+  rm->element_type = htons (0);
+  GNUNET_MQ_send (op->spec->set->client_mq, ev);
+  _GSS_operation_destroy (op, GNUNET_YES);
+  if (GNUNET_YES == keep)
+    GNUNET_free (op);
+}
+
+
+/**
+ * Send all remaining elements in the full result iterator.
+ *
+ * @param cls operation
+ */
+static void
+send_remaining_elements (void *cls)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke;
+  int res;
+
+  res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
+  if (GNUNET_NO == res)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
+    send_done_and_destroy (op);
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
+
+  while (1)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+    struct GNUNET_SET_ResultMessage *rm;
+    struct GNUNET_SET_Element *element;
+    element = &ke->element->element;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
+    GNUNET_assert (0 != op->spec->client_request_id);
+    ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
+    if (NULL == ev)
+    {
+      GNUNET_MQ_discard (ev);
+      GNUNET_break (0);
+      continue;
+    }
+    rm->result_status = htons (GNUNET_SET_STATUS_OK);
+    rm->request_id = htonl (op->spec->client_request_id);
+    rm->element_type = element->element_type;
+    memcpy (&rm[1], element->data, element->size);
+    if (ke->next_colliding == NULL)
+    {
+      GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
+      GNUNET_MQ_send (op->spec->set->client_mq, ev);
+      break;
+    }
+    GNUNET_MQ_send (op->spec->set->client_mq, ev);
+    ke = ke->next_colliding;
+  }
 }
 
 
 }
 
 
@@ -1004,20 +1023,25 @@ send_client_element (struct UnionEvaluateOperation *eo,
  * After the result done message has been sent to the client,
  * destroy the evaluate operation.
  *
  * After the result done message has been sent to the client,
  * destroy the evaluate operation.
  *
- * @param eo union operation
+ * @param op union operation
  */
 static void
  */
 static void
-send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
+finish_and_destroy (struct Operation *op)
 {
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_ResultMessage *rm;
-
-  GNUNET_assert (0 != eo->request_id);
-  mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
-  rm->request_id = htonl (eo->request_id);
-  rm->result_status = htons (GNUNET_SET_STATUS_DONE);
-  GNUNET_MQ_send (eo->set->client_mq, mqm);
+  GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
 
 
+  if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
+  {
+    /* prevent that the op is free'd by the tunnel end handler */
+    op->keep = GNUNET_YES;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
+    GNUNET_assert (NULL == op->state->full_result_iter);
+    op->state->full_result_iter =
+        GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
+    send_remaining_elements (op);
+    return;
+  }
+  send_done_and_destroy (op);
 }
 
 
 }
 
 
@@ -1030,29 +1054,42 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
 static void
 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
 {
 static void
 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct Operation *op = cls;
   struct ElementEntry *ee;
   uint16_t element_size;
 
   struct ElementEntry *ee;
   uint16_t element_size;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "got element from peer\n");
 
 
-  if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
-       (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
+  if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
+       (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
   {
   {
-    fail_union_operation (eo);
+    fail_union_operation (op);
     GNUNET_break (0);
     return;
   }
   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
     GNUNET_break (0);
     return;
   }
   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
-  ee = GNUNET_malloc (sizeof *eo + element_size);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
   memcpy (&ee[1], &mh[1], element_size);
   memcpy (&ee[1], &mh[1], element_size);
+  ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->remote = GNUNET_YES;
   ee->element.data = &ee[1];
   ee->remote = GNUNET_YES;
+  GNUNET_CRYPTO_hash (ee->element.data,
+                      ee->element.size,
+                      &ee->element_hash);
 
 
-  insert_element (eo, ee);
-  send_client_element (eo, &ee->element);
+  if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "got existing element from peer\n");
+    GNUNET_free (ee);
+    return;
+  }
 
 
-  GNUNET_free (ee);
+  op_register_element (op, ee);
+  /* only send results immediately if the client wants it */
+  if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
+    send_client_element (op, &ee->element);
 }
 
 
 }
 
 
@@ -1063,17 +1100,18 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
  * @param mh the message
  */
 static void
  * @param mh the message
  */
 static void
-handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_element_requests (void *cls,
+                             const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct Operation *op = cls;
   struct IBF_Key *ibf_key;
   unsigned int num_keys;
 
   /* look up elements and send them */
   struct IBF_Key *ibf_key;
   unsigned int num_keys;
 
   /* look up elements and send them */
-  if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
   {
     GNUNET_break (0);
   {
     GNUNET_break (0);
-    fail_union_operation (eo);
+    fail_union_operation (op);
     return;
   }
 
     return;
   }
 
@@ -1082,320 +1120,270 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
   {
     GNUNET_break (0);
   if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
   {
     GNUNET_break (0);
-    fail_union_operation (eo);
+    fail_union_operation (op);
     return;
   }
 
   ibf_key = (struct IBF_Key *) &mh[1];
   while (0 != num_keys--)
   {
     return;
   }
 
   ibf_key = (struct IBF_Key *) &mh[1];
   while (0 != num_keys--)
   {
-    send_elements_for_key (eo, *ibf_key);
+    send_elements_for_key (op, *ibf_key);
     ibf_key++;
   }
 }
 
 
     ibf_key++;
   }
 }
 
 
-/**
- * Callback used for notifications
- *
- * @param cls closure
- */
-static void
-peer_done_sent_cb (void *cls)
-{
-  struct UnionEvaluateOperation *eo = cls;
-
-  send_client_done_and_destroy (eo);
-}
-
-
 /**
  * Handle a done message from a remote peer
 /**
  * Handle a done message from a remote peer
- * 
+ *
  * @param cls the union operation
  * @param mh the message
  */
 static void
 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
 {
  * @param cls the union operation
  * @param mh the message
  */
 static void
 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct Operation *op = cls;
+  struct GNUNET_MQ_Envelope *ev;
 
 
-  if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
   {
     /* we got all requests, but still have to send our elements as response */
   {
     /* we got all requests, but still have to send our elements as response */
-    struct GNUNET_MQ_Envelope *mqm;
 
 
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n");
-    eo->phase = PHASE_FINISHED;
-    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-    GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
-    GNUNET_MQ_send (eo->tc->mq, mqm);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
+    op->state->phase = PHASE_FINISHED;
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
+    GNUNET_MQ_send (op->mq, ev);
     return;
   }
     return;
   }
-  if (eo->phase == PHASE_EXPECT_ELEMENTS)
+  if (op->state->phase == PHASE_EXPECT_ELEMENTS)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
-    eo->phase = PHASE_FINISHED;
-    send_client_done_and_destroy (eo);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
+    op->state->phase = PHASE_FINISHED;
+    finish_and_destroy (op);
     return;
   }
   GNUNET_break (0);
     return;
   }
   GNUNET_break (0);
-  fail_union_operation (eo);
+  fail_union_operation (op);
 }
 
 
 /**
 }
 
 
 /**
- * Evaluate a union operation with
- * a remote peer.
+ * Initiate operation to evaluate a set union with a remote peer.
  *
  *
- * @param m the evaluate request message from the client
- * @param set the set to evaluate the operation with
+ * @param op operation to perform (to be initialized)
+ * @param opaque_context message to be transmitted to the listener
+ *        to convince him to accept, may be NULL
  */
  */
-void
-_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
+static void
+union_evaluate (struct Operation *op,
+                const struct GNUNET_MessageHeader *opaque_context)
 {
 {
-  struct UnionEvaluateOperation *eo;
-  struct GNUNET_MessageHeader *context_msg;
-
-  eo = GNUNET_new (struct UnionEvaluateOperation);
-  eo->peer = m->target_peer;
-  eo->set = set;
-  eo->request_id = htonl (m->request_id);
-  GNUNET_assert (0 != eo->request_id);
-  eo->se = strata_estimator_dup (set->state.u->se);
-  eo->salt = ntohs (m->salt);
-  eo->app_id = m->app_id;
-  
-  context_msg = GNUNET_MQ_extract_nested_mh (m);
-  if (NULL != context_msg)
-  {
-    eo->context_msg = GNUNET_copy_message (context_msg);
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
-             "evaluating union operation, (app %s)\n", 
-              GNUNET_h2s (&eo->app_id));
+  struct GNUNET_MQ_Envelope *ev;
+  struct OperationRequestMessage *msg;
 
 
-  eo->tc = GNUNET_new (struct TunnelContext);
-  eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
-                                              GNUNET_APPLICATION_TYPE_SET);
-  GNUNET_assert (NULL != eo->tc->tunnel);
-  eo->tc->peer = eo->peer;
-  eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
+  op->state = GNUNET_new (struct OperationState);
+  /* copy the current generation's strata estimator for this operation */
+  op->state->se = strata_estimator_dup (op->spec->set->state->se);
   /* we started the operation, thus we have to send the operation request */
   /* we started the operation, thus we have to send the operation request */
-  eo->phase = PHASE_EXPECT_SE;
-
-  GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
-                               eo->set->state.u->ops_tail,
-                               eo);
-
-  send_operation_request (eo);
+  op->state->phase = PHASE_EXPECT_SE;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Initiating union operation evaluation\n");
+  ev = GNUNET_MQ_msg_nested_mh (msg,
+                                GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+                                opaque_context);
+  if (NULL == ev)
+  {
+    /* the context message is too large */
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (op->spec->set->client);
+    return;
+  }
+  msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
+  msg->app_id = op->spec->app_id;
+  msg->salt = htonl (op->spec->salt);
+  GNUNET_MQ_send (op->mq, ev);
+
+  if (NULL != opaque_context)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "sent op request with context message\n");
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "sent op request without context message\n");
 }
 
 
 /**
 }
 
 
 /**
- * Accept an union operation request from a remote peer
+ * Accept an union operation request from a remote peer.
+ * Only initializes the private operation state.
  *
  *
- * @param m the accept message from the client
- * @param set the set of the client
- * @param incoming information about the requesting remote peer
+ * @param op operation that will be accepted as a union operation
  */
  */
-void
-_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
-                   struct Incoming *incoming)
+static void
+union_accept (struct Operation *op)
 {
 {
-  struct UnionEvaluateOperation *eo;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
-
-  eo = GNUNET_new (struct UnionEvaluateOperation);
-  eo->tc = incoming->tc;
-  eo->generation_created = set->state.u->current_generation++;
-  eo->set = set;
-  eo->salt = ntohs (incoming->salt);
-  GNUNET_assert (0 != ntohl (m->request_id));
-  eo->request_id = ntohl (m->request_id);
-  eo->se = strata_estimator_dup (set->state.u->se);
-  /* transfer ownership of mq and socket from incoming to eo */
-  GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
-                               eo->set->state.u->ops_tail,
-                               eo);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
+  op->state = GNUNET_new (struct OperationState);
+  op->state->se = strata_estimator_dup (op->spec->set->state->se);
   /* kick off the operation */
   /* kick off the operation */
-  send_strata_estimator (eo);
+  send_strata_estimator (op);
 }
 
 
 /**
  * Create a new set supporting the union operation
  *
 }
 
 
 /**
  * Create a new set supporting the union operation
  *
+ * We maintain one strata estimator per set and then manipulate it over the
+ * lifetime of the set, as recreating a strata estimator would be expensive.
+ *
  * @return the newly created set
  */
  * @return the newly created set
  */
-struct Set *
-_GSS_union_set_create (void)
+static struct SetState *
+union_set_create (void)
 {
 {
-  struct Set *set;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
-  
-  set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
-  set->state.u = (struct UnionState *) &set[1];
-  set->operation = GNUNET_SET_OPERATION_UNION;
-  /* keys of the hash map are stored in the element entrys, thus we do not
-   * want the hash map to copy them */
-  set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-  set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
-                                              SE_IBF_SIZE, SE_IBF_HASH_NUM);  
-  return set;
+  struct SetState *set_state;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
+
+  set_state = GNUNET_new (struct SetState);
+  set_state->se = strata_estimator_create (SE_STRATA_COUNT,
+                                              SE_IBF_SIZE, SE_IBF_HASH_NUM);
+  return set_state;
 }
 
 
 /**
  * Add the element from the given element message to the set.
  *
 }
 
 
 /**
  * Add the element from the given element message to the set.
  *
- * @param m message with the element
- * @param set set to add the element to
+ * @param set_state state of the set want to add to
+ * @param ee the element to add to the set
  */
  */
-void
-_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+union_add (struct SetState *set_state, struct ElementEntry *ee)
 {
 {
-  struct ElementEntry *ee;
-  struct ElementEntry *ee_dup;
-  uint16_t element_size;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
-
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  element_size = ntohs (m->header.size) - sizeof *m;
-  ee = GNUNET_malloc (element_size + sizeof *ee);
-  ee->element.size = element_size;
-  memcpy (&ee[1], &m[1], element_size);
-  ee->element.data = &ee[1];
-  ee->generation_added = set->state.u->current_generation;
-  GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
-  ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &ee->element_hash);
-  if (NULL != ee_dup)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
-    GNUNET_free (ee);
-    return;
-  }
-  GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, &ee->element_hash, ee,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 0));
+  strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
 }
 
 
 /**
 }
 
 
 /**
- * Destroy a set that supports the union operation
+ * Remove the element given in the element message from the set.
+ * Only marks the element as removed, so that older set operations can still exchange it.
  *
  *
- * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
+ * @param set_state state of the set to remove from
+ * @param ee set element to remove
  */
  */
-void
-_GSS_union_set_destroy (struct Set *set)
+static void
+union_remove (struct SetState *set_state, struct ElementEntry *ee)
 {
 {
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  if (NULL != set->client)
-  {
-    GNUNET_SERVER_client_drop (set->client);
-    set->client = NULL;
-  }
-  if (NULL != set->client_mq)
-  {
-    GNUNET_MQ_destroy (set->client_mq);
-    set->client_mq = NULL;
-  }
-
-  if (NULL != set->state.u->se)
-  {
-    strata_estimator_destroy (set->state.u->se);
-    set->state.u->se = NULL;
-  }
-
-  destroy_elements (set->state.u);
-
-  while (NULL != set->state.u->ops_head)
-  {
-    _GSS_union_operation_destroy (set->state.u->ops_head);
-  }
+  strata_estimator_remove (set_state->se, get_ibf_key (&ee->element_hash, 0));
 }
 
 }
 
+
 /**
 /**
- * Remove the element given in the element message from the set.
- * Only marks the element as removed, so that older set operations can still exchange it.
+ * Destroy a set that supports the union operation.
  *
  *
- * @param m message with the element
- * @param set set to remove the element from
+ * @param set_state the set to destroy
  */
  */
-void
-_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+union_set_destroy (struct SetState *set_state)
 {
 {
-  struct GNUNET_HashCode hash;
-  struct ElementEntry *ee;
-
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
-  ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
-  if (NULL == ee)
+  if (NULL != set_state->se)
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
-    return;
-  }
-  if (GNUNET_YES == ee->removed)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
-    return;
+    strata_estimator_destroy (set_state->se);
+    set_state->se = NULL;
   }
   }
-  ee->removed = GNUNET_YES;
-  ee->generation_removed = set->state.u->current_generation;
+  GNUNET_free (set_state);
 }
 
 
 /**
  * Dispatch messages for a union operation.
  *
 }
 
 
 /**
  * Dispatch messages for a union operation.
  *
- * @param cls closure
- * @param tunnel mesh tunnel
- * @param tunnel_ctx tunnel context
- * @param mh message to process
- * @return ???
+ * @param op the state of the union evaluate operation
+ * @param mh the received message
+ * @return GNUNET_SYSERR if the tunnel should be disconnected,
+ *         GNUNET_OK otherwise
  */
 int
  */
 int
-_GSS_union_handle_p2p_message (void *cls,
-                               struct GNUNET_MESH_Tunnel *tunnel,
-                               void **tunnel_ctx,
-                               const struct GNUNET_MessageHeader *mh)
+union_handle_p2p_message (struct Operation *op,
+                          const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct TunnelContext *tc = *tunnel_ctx;
-  struct UnionEvaluateOperation *eo;
-
-  if (CONTEXT_OPERATION_UNION != tc->type)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-
-  eo = tc->data;
-
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
+              ntohs (mh->type), ntohs (mh->size));
   switch (ntohs (mh->type))
   {
   switch (ntohs (mh->type))
   {
-    case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
-      handle_p2p_ibf (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
+      handle_p2p_ibf (op, mh);
       break;
       break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
-      handle_p2p_strata_estimator (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
+      handle_p2p_strata_estimator (op, mh);
       break;
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
       break;
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
-      handle_p2p_elements (eo, mh);
+      handle_p2p_elements (op, mh);
       break;
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
       break;
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
-      handle_p2p_element_requests (eo, mh);
+      handle_p2p_element_requests (op, mh);
       break;
     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
       break;
     case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
-      handle_p2p_done (eo, mh);
+      handle_p2p_done (op, mh);
       break;
     default:
       break;
     default:
-      /* something wrong with mesh's message handlers? */
+      /* something wrong with cadet's message handlers? */
       GNUNET_assert (0);
   }
   return GNUNET_OK;
 }
       GNUNET_assert (0);
   }
   return GNUNET_OK;
 }
+
+/**
+ * handler for peer-disconnects, notifies the client
+ * about the aborted operation in case the op was not concluded
+ *
+ * @param op the destroyed operation
+ */
+static void
+union_peer_disconnect (struct Operation *op)
+{
+  if (PHASE_FINISHED != op->state->phase)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+    struct GNUNET_SET_ResultMessage *msg;
+
+    ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+    msg->request_id = htonl (op->spec->client_request_id);
+    msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
+    msg->element_type = htons (0);
+    GNUNET_MQ_send (op->spec->set->client_mq, ev);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "other peer disconnected prematurely\n");
+    _GSS_operation_destroy (op, GNUNET_YES);
+    return;
+  }
+  // else: the session has already been concluded
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
+  if (GNUNET_NO == op->state->client_done_sent)
+    finish_and_destroy (op);
+}
+
+
+/**
+ * Get the table with implementing functions for
+ * set union.
+ *
+ * @return the operation specific VTable
+ */
+const struct SetVT *
+_GSS_union_vt ()
+{
+  static const struct SetVT union_vt = {
+    .create = &union_set_create,
+    .msg_handler = &union_handle_p2p_message,
+    .add = &union_add,
+    .remove = &union_remove,
+    .destroy_set = &union_set_destroy,
+    .evaluate = &union_evaluate,
+    .accept = &union_accept,
+    .peer_disconnect = &union_peer_disconnect,
+    .cancel = &union_op_cancel,
+  };
+
+  return &union_vt;
+}