set: destroy client mq properly
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
index e512761cc9a23052d84eed3a93a51bba79230a76..f46713c3102d25b403b3aea2ae369d6915a0e9d0 100644 (file)
@@ -1,10 +1,10 @@
 /*
       This file is part of GNUnet
 /*
       This file is part of GNUnet
-      (C) 2013 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013-2016 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
+      by the Free Software Foundation; either version 3, or (at your
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
       option) any later version.
 
       GNUnet is distributed in the hope that it will be useful, but
 
       You should have received a copy of the GNU General Public License
       along with GNUnet; see the file COPYING.  If not, write to the
 
       You should have received a copy of the GNU General Public License
       along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
+      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+      Boston, MA 02110-1301, USA.
 */
 */
-
 /**
 /**
- * @file set/gnunet-service-set.c
+ * @file set/gnunet-service-set_union.c
+
  * @brief two-peer set operations
  * @author Florian Dold
  */
  * @brief two-peer set operations
  * @author Florian Dold
  */
-
-
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_statistics_service.h"
 #include "gnunet-service-set.h"
 #include "gnunet-service-set.h"
-#include "gnunet_container_lib.h"
-#include "gnunet_crypto_lib.h"
 #include "ibf.h"
 #include "ibf.h"
-#include "strata_estimator.h"
-#include "set_protocol.h"
+#include "gnunet-service-set_union_strata_estimator.h"
+#include "gnunet-service-set_protocol.h"
 #include <gcrypt.h>
 
 
 #include <gcrypt.h>
 
 
+#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
+
+
 /**
  * Number of IBFs in a strata estimator.
  */
 #define SE_STRATA_COUNT 32
 /**
  * Number of IBFs in a strata estimator.
  */
 #define SE_STRATA_COUNT 32
+
 /**
  * Size of the IBFs in the strata estimator.
  */
 #define SE_IBF_SIZE 80
 /**
  * Size of the IBFs in the strata estimator.
  */
 #define SE_IBF_SIZE 80
+
 /**
 /**
- * hash num parameter for the difference digests and strata estimators
+ * The hash num parameter for the difference digests and strata estimators.
  */
  */
-#define SE_IBF_HASH_NUM 3
+#define SE_IBF_HASH_NUM 4
 
 /**
  * Number of buckets that can be transmitted in one message.
 
 /**
  * Number of buckets that can be transmitted in one message.
  * Choose this value so that computing the IBF is still cheaper
  * than transmitting all values.
  */
  * Choose this value so that computing the IBF is still cheaper
  * than transmitting all values.
  */
-#define MAX_IBF_ORDER (16)
+#define MAX_IBF_ORDER (20)
+
+/**
+ * Number of buckets used in the ibf per estimated
+ * difference.
+ */
+#define IBF_ALPHA 4
 
 
 /**
 
 
 /**
 enum UnionOperationPhase
 {
   /**
 enum UnionOperationPhase
 {
   /**
-   * We sent the request message, and expect a strata estimator
+   * We sent the request message, and expect a strata estimator.
    */
   PHASE_EXPECT_SE,
    */
   PHASE_EXPECT_SE,
+
   /**
   /**
-   * We sent the strata estimator, and expect an IBF
+   * We sent the strata estimator, and expect an IBF. This phase is entered once
+   * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
+   *
+   * XXX: could use better wording.
+   * XXX: repurposed to also expect a "request full set" message, should be renamed
+   *
+   * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
    */
   PHASE_EXPECT_IBF,
    */
   PHASE_EXPECT_IBF,
+
   /**
   /**
-   * We know what type of IBF the other peer wants to send us,
-   * and expect the remaining parts
+   * Continuation for multi part IBFs.
    */
   PHASE_EXPECT_IBF_CONT,
    */
   PHASE_EXPECT_IBF_CONT,
+
   /**
   /**
-   * We are sending request and elements,
-   * and thus only expect elements from the other peer.
-   */
-  PHASE_EXPECT_ELEMENTS,
-  /**
-   * We are expecting elements and requests, and send
-   * requested elements back to the other peer.
+   * We are decoding an IBF.
    */
    */
-  PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
+  PHASE_INVENTORY_ACTIVE,
+
   /**
   /**
-   * The protocol is over.
-   * Results may still have to be sent to the client.
+   * The other peer is decoding the IBF we just sent.
    */
    */
-  PHASE_FINISHED
-};
+  PHASE_INVENTORY_PASSIVE,
 
 
-
-/**
- * State of an evaluate operation
- * with another peer.
- */
-struct OperationState
-{
   /**
   /**
-   * Tunnel to the remote peer.
+   * The protocol is almost finished, but we still have to flush our message
+   * queue and/or expect some elements.
    */
    */
-  struct GNUNET_MESH_Tunnel *tunnel;
+  PHASE_FINISH_CLOSING,
 
   /**
 
   /**
-   * Detail information about the set operation,
-   * including the set to use.
+   * In the penultimate phase,
+   * we wait until all our demands
+   * are satisfied.  Then we send a done
+   * message, and wait for another done message.
    */
    */
-  struct OperationSpecification *spec;
+  PHASE_FINISH_WAITING,
 
   /**
 
   /**
-   * Message queue for the peer.
+   * In the ultimate phase, we wait until
+   * our demands are satisfied and then
+   * quit (sending another DONE message).
    */
    */
-  struct GNUNET_MQ_Handle *mq;
+  PHASE_DONE,
 
   /**
 
   /**
-   * Number of ibf buckets received
+   * After sending the full set, wait for responses with the elements
+   * that the local peer is missing.
    */
    */
-  unsigned int ibf_buckets_received;
+  PHASE_FULL_SENDING,
+};
+
 
 
+/**
+ * State of an evaluate operation with another peer.
+ */
+struct OperationState
+{
   /**
    * Copy of the set's strata estimator at the time of
   /**
    * Copy of the set's strata estimator at the time of
-   * creation of this operation
+   * creation of this operation.
    */
   struct StrataEstimator *se;
 
   /**
    */
   struct StrataEstimator *se;
 
   /**
-   * The ibf we currently receive
+   * The IBF we currently receive.
    */
   struct InvertibleBloomFilter *remote_ibf;
 
   /**
    */
   struct InvertibleBloomFilter *remote_ibf;
 
   /**
-   * IBF of the set's element.
+   * The IBF with the local set's element.
    */
   struct InvertibleBloomFilter *local_ibf;
 
   /**
    */
   struct InvertibleBloomFilter *local_ibf;
 
   /**
-   * Maps IBF-Keys (specific to the current salt) to elements.
+   * Maps unsalted IBF-Keys to elements.
+   * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
+   * Colliding IBF-Keys are linked.
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
 
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
 
@@ -150,82 +169,51 @@ struct OperationState
   enum UnionOperationPhase phase;
 
   /**
   enum UnionOperationPhase phase;
 
   /**
-   * Generation in which the operation handle
-   * was created.
+   * Did we send the client that we are done?
    */
    */
-  unsigned int generation_created;
+  int client_done_sent;
 
   /**
 
   /**
-   * Set state of the set that this operation
-   * belongs to.
-   */
-  struct SetState *set_state;
-  
-  /**
-   * Evaluate operations are held in
-   * a linked list.
+   * Number of ibf buckets already received into the @a remote_ibf.
    */
    */
-  struct OperationState *next;
-  
-   /**
-    * Evaluate operations are held in
-    * a linked list.
-    */
-  struct OperationState *prev;
-};
-
+  unsigned int ibf_buckets_received;
 
 
-/**
- * Information about an element element in the set.
- * All elements are stored in a hash-table
- * from their hash-code to their 'struct Element',
- * so that the remove and add operations are reasonably
- * fast.
- */
-struct ElementEntry
-{
   /**
   /**
-   * The actual element. The data for the element
-   * should be allocated at the end of this struct.
+   * Hashes for elements that we have demanded from the other peer.
    */
    */
-  struct GNUNET_SET_Element element;
+  struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
 
   /**
 
   /**
-   * Hash of the element.
-   * Will be used to derive the different IBF keys
-   * for different salts.
+   * Salt that we're using for sending IBFs
    */
    */
-  struct GNUNET_HashCode element_hash;
+  uint32_t salt_send;
 
   /**
 
   /**
-   * Generation the element was added by the client.
-   * Operations of earlier generations will not consider the element.
+   * Salt for the IBF we've received and that we're currently decoding.
    */
    */
-  unsigned int generation_added;
+  uint32_t salt_receive;
 
   /**
 
   /**
-   * GNUNET_YES if the element has been removed in some generation.
+   * Number of elements we received from the other peer
+   * that were not in the local set yet.
    */
    */
-  int removed;
+  uint32_t received_fresh;
 
   /**
 
   /**
-   * Generation the element was removed by the client. 
-   * Operations of later generations will not consider the element.
-   * Only valid if is_removed is GNUNET_YES.
+   * Total number of elements received from the other peer.
    */
    */
-  unsigned int generation_removed;
+  uint32_t received_total;
 
   /**
 
   /**
-   * GNUNET_YES if the element is a remote element, and does not belong
-   * to the operation's set.
+   * Initial size of our set, just before
+   * the operation started.
    */
    */
-  int remote;
+  uint64_t initial_size;
 };
 
 
 /**
 };
 
 
 /**
- * The key entry is used to associate an ibf key with
- * an element.
+ * The key entry is used to associate an ibf key with an element.
  */
 struct KeyEntry
 {
  */
 struct KeyEntry
 {
@@ -235,15 +223,20 @@ struct KeyEntry
   struct IBF_Key ibf_key;
 
   /**
   struct IBF_Key ibf_key;
 
   /**
-   * The actual element associated with the key
+   * The actual element associated with the key.
+   *
+   * Only owned by the union operation if element->operation
+   * is #GNUNET_YES.
    */
   struct ElementEntry *element;
 
   /**
    */
   struct ElementEntry *element;
 
   /**
-   * Element that collides with this element
-   * on the ibf key
+   * Did we receive this element?
+   * Even if element->is_foreign is false, we might
+   * have received the element, so this indicates that
+   * the other peer has it.
    */
    */
-  struct KeyEntry *next_colliding;
+  int received;
 };
 
 
 };
 
 
@@ -263,7 +256,7 @@ struct SendElementClosure
    * Operation for which the elements
    * should be sent.
    */
    * Operation for which the elements
    * should be sent.
    */
-  struct OperationState *eo;
+  struct Operation *op;
 };
 
 
 };
 
 
@@ -279,64 +272,18 @@ struct SetState
    * salt=0.
    */
   struct StrataEstimator *se;
    * salt=0.
    */
   struct StrataEstimator *se;
-
-  /**
-   * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *elements;
-
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct OperationState *ops_head;
-
-  /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct OperationState *ops_tail;
-
-  /**
-   * Current generation, that is, number of
-   * previously executed operations on this set
-   */
-  unsigned int current_generation;
 };
 
 
 };
 
 
-
-/**
- * Iterator over hash map entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-destroy_elements_iterator (void *cls,
-                           const struct GNUNET_HashCode * key,
-                           void *value)
-{
-  struct ElementEntry *ee = value;
-
-  GNUNET_free (ee);
-  return GNUNET_YES;
-}
-
-
 /**
 /**
- * Iterator over hash map entries.
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
  */
 static int
 destroy_key_to_element_iter (void *cls,
  */
 static int
 destroy_key_to_element_iter (void *cls,
@@ -344,82 +291,63 @@ destroy_key_to_element_iter (void *cls,
                              void *value)
 {
   struct KeyEntry *k = value;
                              void *value)
 {
   struct KeyEntry *k = value;
-  
-  while (NULL != k)
+
+  GNUNET_assert (NULL != k);
+  if (GNUNET_YES == k->element->remote)
   {
   {
-    struct KeyEntry *k_tmp = k;
-    k = k->next_colliding;
-    if (GNUNET_YES == k_tmp->element->remote)
-    {
-      GNUNET_free (k_tmp->element);
-      k_tmp->element = NULL;
-    }
-    GNUNET_free (k_tmp);
+    GNUNET_free (k->element);
+    k->element = NULL;
   }
   }
+  GNUNET_free (k);
   return GNUNET_YES;
 }
 
 
 /**
   return GNUNET_YES;
 }
 
 
 /**
- * Destroy a union operation, and free all resources
- * associated with it.
+ * Destroy the union operation.  Only things specific to the union
+ * operation are destroyed.
  *
  *
- * @param eo the union operation to destroy
+ * @param op union operation to destroy
  */
 static void
  */
 static void
-union_operation_destroy (struct OperationState *eo)
+union_op_cancel (struct Operation *op)
 {
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
-  GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
-                               eo->set_state->ops_tail,
-                               eo);
-  if (NULL != eo->mq)
-  {
-    GNUNET_MQ_destroy (eo->mq);
-    eo->mq = NULL;
-  }
-  if (NULL != eo->tunnel)
-  {
-    struct GNUNET_MESH_Tunnel *t = eo->tunnel;
-    eo->tunnel = NULL;
-    GNUNET_MESH_tunnel_destroy (t);
-  }
-  if (NULL != eo->remote_ibf)
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "destroying union op\n");
+  /* check if the op was canceled twice */
+  GNUNET_assert (NULL != op->state);
+  if (NULL != op->state->remote_ibf)
   {
   {
-    ibf_destroy (eo->remote_ibf);
-    eo->remote_ibf = NULL;
+    ibf_destroy (op->state->remote_ibf);
+    op->state->remote_ibf = NULL;
   }
   }
-  if (NULL != eo->local_ibf)
+  if (NULL != op->state->demanded_hashes)
   {
   {
-    ibf_destroy (eo->local_ibf);
-    eo->local_ibf = NULL;
+    GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
+    op->state->demanded_hashes = NULL;
   }
   }
-  if (NULL != eo->se)
+  if (NULL != op->state->local_ibf)
   {
   {
-    strata_estimator_destroy (eo->se);
-    eo->se = NULL;
+    ibf_destroy (op->state->local_ibf);
+    op->state->local_ibf = NULL;
   }
   }
-  if (NULL != eo->key_to_element)
+  if (NULL != op->state->se)
   {
   {
-    GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL);
-    GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
-    eo->key_to_element = NULL;
+    strata_estimator_destroy (op->state->se);
+    op->state->se = NULL;
   }
   }
-  if (NULL != eo->spec)
+  if (NULL != op->state->key_to_element)
   {
   {
-    if (NULL != eo->spec->context_msg)
-    {
-      GNUNET_free (eo->spec->context_msg);
-      eo->spec->context_msg = NULL;
-    }
-    GNUNET_free (eo->spec);
-    eo->spec = NULL;
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                             &destroy_key_to_element_iter,
+                                             NULL);
+    GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
+    op->state->key_to_element = NULL;
   }
   }
-  GNUNET_free (eo);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
-
-  /* FIXME: do a garbage collection of the set generations */
+  GNUNET_free (op->state);
+  op->state = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "destroying union op done\n");
 }
 
 
 }
 
 
@@ -427,143 +355,176 @@ union_operation_destroy (struct OperationState *eo)
  * Inform the client that the union operation has failed,
  * and proceed to destroy the evaluate operation.
  *
  * Inform the client that the union operation has failed,
  * and proceed to destroy the evaluate operation.
  *
- * @param eo the union operation to fail
+ * @param op the union operation to fail
  */
 static void
  */
 static void
-fail_union_operation (struct OperationState *eo)
+fail_union_operation (struct Operation *op)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *msg;
 
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *msg;
 
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "union operation failed\n");
   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
-  msg->request_id = htonl (eo->spec->client_request_id);
+  msg->request_id = htonl (op->spec->client_request_id);
   msg->element_type = htons (0);
   msg->element_type = htons (0);
-  GNUNET_MQ_send (eo->spec->set->client_mq, ev);
-  union_operation_destroy (eo);
+  GNUNET_MQ_send (op->spec->set->client_mq, ev);
+  _GSS_operation_destroy (op, GNUNET_YES);
 }
 
 
 /**
 }
 
 
 /**
- * Derive the IBF key from a hash code and 
+ * Derive the IBF key from a hash code and
  * a salt.
  *
  * @param src the hash code
  * a salt.
  *
  * @param src the hash code
- * @param salt salt to use
  * @return the derived IBF key
  */
 static struct IBF_Key
  * @return the derived IBF key
  */
 static struct IBF_Key
-get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
+get_ibf_key (const struct GNUNET_HashCode *src)
 {
   struct IBF_Key key;
 {
   struct IBF_Key key;
+  uint16_t salt = 0;
 
 
-  GNUNET_CRYPTO_hkdf (&key, sizeof (key),
-                     GCRY_MD_SHA512, GCRY_MD_SHA256,
-                      src, sizeof *src,
-                     &salt, sizeof (salt),
-                     NULL, 0);
+  GNUNET_CRYPTO_kdf (&key, sizeof (key),
+                     src, sizeof *src,
+                     &salt, sizeof (salt),
+                     NULL, 0);
   return key;
 }
 
 
 /**
   return key;
 }
 
 
 /**
- * Send a request for the evaluate operation to a remote peer
- *
- * @param eo operation with the other peer
+ * Context for #op_get_element_iterator
  */
  */
-static void
-send_operation_request (struct OperationState *eo)
+struct GetElementContext
 {
 {
-  struct GNUNET_MQ_Envelope *ev;
-  struct OperationRequestMessage *msg;
-
-  ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                                eo->spec->context_msg);
-
-  if (NULL == ev)
-  {
-    /* the context message is too large */
-    GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (eo->spec->set->client);
-    return;
-  }
-  msg->operation = htons (GNUNET_SET_OPERATION_UNION);
-  msg->app_id = eo->spec->app_id;
-  msg->salt = htonl (eo->spec->salt);
-  GNUNET_MQ_send (eo->mq, ev);
-
-  if (NULL != eo->spec->context_msg)
-  {
-    GNUNET_free (eo->spec->context_msg);
-    eo->spec->context_msg = NULL;
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
-}
+  struct GNUNET_HashCode hash;
+  struct KeyEntry *k;
+};
 
 
 /**
 
 
 /**
- * Iterator to create the mapping between ibf keys
- * and element entries.
+ * Iterator over the mapping from IBF keys to element entries.  Checks if we
+ * have an element with a given GNUNET_HashCode.
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
+ * @return #GNUNET_YES if we should search further,
+ *         #GNUNET_NO if we've found the element.
  */
 static int
  */
 static int
-insert_element_iterator (void *cls,
+op_get_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
 {
                          uint32_t key,
                          void *value)
 {
-  struct KeyEntry *const new_k = cls;
-  struct KeyEntry *old_k = value;
+  struct GetElementContext *ctx = cls;
+  struct KeyEntry *k = value;
 
 
-  GNUNET_assert (NULL != old_k);
-  do
+  GNUNET_assert (NULL != k);
+  if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
+                                   &ctx->hash))
   {
   {
-    if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
-    {
-      new_k->next_colliding = old_k->next_colliding;
-      old_k->next_colliding = new_k;
-      return GNUNET_NO;
-    }
-    old_k = old_k->next_colliding;
-  } while (NULL != old_k);
+    ctx->k = k;
+    return GNUNET_NO;
+  }
   return GNUNET_YES;
 }
 
 
   return GNUNET_YES;
 }
 
 
+/**
+ * Determine whether the given element is already in the operation's element
+ * set.
+ *
+ * @param op operation that should be tested for 'element_hash'
+ * @param element_hash hash of the element to look for
+ * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
+ */
+static struct KeyEntry *
+op_get_element (struct Operation *op,
+                const struct GNUNET_HashCode *element_hash)
+{
+  int ret;
+  struct IBF_Key ibf_key;
+  struct GetElementContext ctx = {{{ 0 }} , 0};
+
+  ctx.hash = *element_hash;
+
+  ibf_key = get_ibf_key (element_hash);
+  ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+                                                      (uint32_t) ibf_key.key_val,
+                                                      op_get_element_iterator,
+                                                      &ctx);
+
+  /* was the iteration aborted because we found the element? */
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_assert (NULL != ctx.k);
+    return ctx.k;
+  }
+  return NULL;
+}
+
+
 /**
  * Insert an element into the union operation's
  * key-to-element mapping. Takes ownership of 'ee'.
 /**
  * Insert an element into the union operation's
  * key-to-element mapping. Takes ownership of 'ee'.
+ * Note that this does not insert the element in the set,
+ * only in the operation's key-element mapping.
+ * This is done to speed up re-tried operations, if some elements
+ * were transmitted, and then the IBF fails to decode.
+ *
+ * XXX: clarify ownership, doesn't sound right.
  *
  *
- * @param eo the union operation
+ * @param op the union operation
  * @param ee the element entry
  * @param ee the element entry
+ * @parem received was this element received from the remote peer?
  */
 static void
  */
 static void
-insert_element (struct OperationState *eo, struct ElementEntry *ee)
+op_register_element (struct Operation *op,
+                     struct ElementEntry *ee,
+                     int received)
 {
 {
-  int ret;
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
 
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
 
-  ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
+  ibf_key = get_ibf_key (&ee->element_hash);
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
-  ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
+  k->received = received;
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
                                                       (uint32_t) ibf_key.key_val,
                                                       (uint32_t) ibf_key.key_val,
-                                                      insert_element_iterator, k);
+                                                      k,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+}
 
 
-  /* was the element inserted into a colliding bucket? */
-  if (GNUNET_SYSERR == ret)
-    return;
 
 
-  GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
-                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+static void
+salt_key (const struct IBF_Key *k_in,
+          uint32_t salt,
+          struct IBF_Key *k_out)
+{
+  int s = salt % 64;
+  uint64_t x = k_in->key_val;
+  /* rotate ibf key */
+  x = (x >> s) | (x << (64 - s));
+  k_out->key_val = x;
+}
+
+
+static void
+unsalt_key (const struct IBF_Key *k_in,
+            uint32_t salt,
+            struct IBF_Key *k_out)
+{
+  int s = salt % 64;
+  uint64_t x = k_in->key_val;
+  x = (x << s) | (x >> (64 - s));
+  k_out->key_val = x;
 }
 
 
 }
 
 
@@ -579,10 +540,17 @@ prepare_ibf_iterator (void *cls,
                       uint32_t key,
                       void *value)
 {
                       uint32_t key,
                       void *value)
 {
-  struct InvertibleBloomFilter *ibf = cls;
+  struct Operation *op = cls;
   struct KeyEntry *ke = value;
   struct KeyEntry *ke = value;
-
-  ibf_insert (ibf, ke->ibf_key);
+  struct IBF_Key salted_key;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "[OP %x] inserting %lx (hash %s) into ibf\n",
+       (void *) op,
+       (unsigned long) ke->ibf_key.key_val,
+       GNUNET_h2s (&ke->element->element_hash));
+  salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
+  ibf_insert (op->state->local_ibf, salted_key);
   return GNUNET_YES;
 }
 
   return GNUNET_YES;
 }
 
@@ -591,76 +559,114 @@ prepare_ibf_iterator (void *cls,
  * Iterator for initializing the
  * key-to-element mapping of a union operation
  *
  * Iterator for initializing the
  * key-to-element mapping of a union operation
  *
- * @param cls the union operation
- * @param key unised
- * @param value the element entry to insert
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
  *        into the key-to-element mapping
  *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
  */
 static int
 init_key_to_element_iterator (void *cls,
                               const struct GNUNET_HashCode *key,
                               void *value)
 {
  */
 static int
 init_key_to_element_iterator (void *cls,
                               const struct GNUNET_HashCode *key,
                               void *value)
 {
-  struct OperationState *eo = cls;
-  struct ElementEntry *e = value;
+  struct Operation *op = cls;
+  struct ElementEntry *ee = value;
 
   /* make sure that the element belongs to the set at the time
    * of creating the operation */
 
   /* make sure that the element belongs to the set at the time
    * of creating the operation */
-  if ( (e->generation_added > eo->generation_created) ||
-       ( (GNUNET_YES == e->removed) &&
-         (e->generation_removed < eo->generation_created)))
+  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
     return GNUNET_YES;
 
     return GNUNET_YES;
 
-  e->remote = GNUNET_NO;
+  GNUNET_assert (GNUNET_NO == ee->remote);
 
 
-  insert_element (eo, e);
+  op_register_element (op, ee, GNUNET_NO);
   return GNUNET_YES;
 }
 
 
   return GNUNET_YES;
 }
 
 
+/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+  unsigned int len;
+
+  GNUNET_assert (NULL == op->state->key_to_element);
+  len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
+  op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
+}
+
+
 /**
  * Create an ibf with the operation's elements
  * of the specified size
  *
 /**
  * Create an ibf with the operation's elements
  * of the specified size
  *
- * @param eo the union operation
+ * @param op the union operation
  * @param size size of the ibf to create
  * @param size size of the ibf to create
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
  */
-static void
-prepare_ibf (struct OperationState *eo, uint16_t size)
+static int
+prepare_ibf (struct Operation *op,
+             uint32_t size)
 {
 {
-  if (NULL == eo->key_to_element)
+  GNUNET_assert (NULL != op->state->key_to_element);
+
+  if (NULL != op->state->local_ibf)
+    ibf_destroy (op->state->local_ibf);
+  op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
+  if (NULL == op->state->local_ibf)
   {
   {
-    unsigned int len;
-    len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
-    eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
-    GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
-                                           init_key_to_element_iterator, eo);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to allocate local IBF\n");
+    return GNUNET_SYSERR;
   }
   }
-  if (NULL != eo->local_ibf)
-    ibf_destroy (eo->local_ibf);
-  eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
-  GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element,
-                                           prepare_ibf_iterator, eo->local_ibf);
+  GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                           &prepare_ibf_iterator,
+                                           op);
+  return GNUNET_OK;
 }
 
 
 /**
  * Send an ibf of appropriate size.
  *
 }
 
 
 /**
  * Send an ibf of appropriate size.
  *
- * @param eo the union operation
+ * Fragments the IBF into multiple messages if necessary.
+ *
+ * @param op the union operation
  * @param ibf_order order of the ibf to send, size=2^order
  * @param ibf_order order of the ibf to send, size=2^order
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
  */
-static void
-send_ibf (struct OperationState *eo, uint16_t ibf_order)
+static int
+send_ibf (struct Operation *op,
+          uint16_t ibf_order)
 {
   unsigned int buckets_sent = 0;
   struct InvertibleBloomFilter *ibf;
 
 {
   unsigned int buckets_sent = 0;
   struct InvertibleBloomFilter *ibf;
 
-  prepare_ibf (eo, 1<<ibf_order);
+  if (GNUNET_OK !=
+      prepare_ibf (op, 1<<ibf_order))
+  {
+    /* allocation failed */
+    return GNUNET_SYSERR;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "sending ibf of size %u\n",
+       1<<ibf_order);
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
+  {
+    char name[64] = { 0 };
+    snprintf (name, sizeof (name), "# sent IBF (order %u)", ibf_order);
+    GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
+  }
 
 
-  ibf = eo->local_ibf;
+  ibf = op->state->local_ibf;
 
   while (buckets_sent < (1 << ibf_order))
   {
 
   while (buckets_sent < (1 << ibf_order))
   {
@@ -673,41 +679,67 @@ send_ibf (struct OperationState *eo, uint16_t ibf_order)
     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
 
     if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
       buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
 
-    ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
-                               GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
-    msg->reserved = 0;
+    ev = GNUNET_MQ_msg_extra (msg,
+                              buckets_in_message * IBF_BUCKET_SIZE,
+                              GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
+    msg->reserved1 = 0;
+    msg->reserved2 = 0;
     msg->order = ibf_order;
     msg->order = ibf_order;
-    msg->offset = htons (buckets_sent);
+    msg->offset = htonl (buckets_sent);
+    msg->salt = htonl (op->state->salt_send);
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
-                buckets_in_message, buckets_sent, 1<<ibf_order);
-    GNUNET_MQ_send (eo->mq, ev);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "ibf chunk size %u, %u/%u sent\n",
+         buckets_in_message,
+         buckets_sent,
+         1<<ibf_order);
+    GNUNET_MQ_send (op->mq, ev);
   }
 
   }
 
-  eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
+  /* The other peer must decode the IBF, so
+   * we're passive. */
+  op->state->phase = PHASE_INVENTORY_PASSIVE;
+  return GNUNET_OK;
 }
 
 
 /**
  * Send a strata estimator to the remote peer.
  *
 }
 
 
 /**
  * Send a strata estimator to the remote peer.
  *
- * @param eo the union operation with the remote peer
+ * @param op the union operation with the remote peer
  */
 static void
  */
 static void
-send_strata_estimator (struct OperationState *eo)
+send_strata_estimator (struct Operation *op)
 {
 {
+  const struct StrataEstimator *se = op->state->se;
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_MQ_Envelope *ev;
-  struct GNUNET_MessageHeader *strata_msg;
-
-  ev = GNUNET_MQ_msg_header_extra (strata_msg,
-                                   SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
-                                   GNUNET_MESSAGE_TYPE_SET_P2P_SE);
-  strata_estimator_write (eo->set_state->se, &strata_msg[1]);
-  GNUNET_MQ_send (eo->mq, ev);
-  eo->phase = PHASE_EXPECT_IBF;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
+  struct StrataEstimatorMessage *strata_msg;
+  char *buf;
+  size_t len;
+  uint16_t type;
+
+  buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
+  len = strata_estimator_write (op->state->se,
+                                buf);
+  if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
+    type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
+  else
+    type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
+  ev = GNUNET_MQ_msg_extra (strata_msg,
+                            len,
+                            type);
+  GNUNET_memcpy (&strata_msg[1],
+          buf,
+          len);
+  GNUNET_free (buf);
+  strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
+  GNUNET_MQ_send (op->mq,
+                  ev);
+  op->state->phase = PHASE_EXPECT_IBF;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "sent SE, expecting IBF\n");
 }
 
 
 }
 
 
@@ -724,11 +756,56 @@ get_order_from_difference (unsigned int diff)
   unsigned int ibf_order;
 
   ibf_order = 2;
   unsigned int ibf_order;
 
   ibf_order = 2;
-  while ((1<<ibf_order) < (2 * diff))
+  while ( (1<<ibf_order) < (IBF_ALPHA * diff) ||
+          ((1<<ibf_order) < SE_IBF_HASH_NUM) )
     ibf_order++;
   if (ibf_order > MAX_IBF_ORDER)
     ibf_order = MAX_IBF_ORDER;
     ibf_order++;
   if (ibf_order > MAX_IBF_ORDER)
     ibf_order = MAX_IBF_ORDER;
-  return ibf_order;
+  // add one for correction
+  return ibf_order + 1;
+}
+
+
+/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_element_iterator (void *cls,
+                       const struct GNUNET_HashCode *key,
+                       void *value)
+{
+  struct Operation *op = cls;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct ElementEntry *ee = value;
+  struct GNUNET_SET_Element *el = &ee->element;
+  struct GNUNET_MQ_Envelope *ev;
+
+
+  ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  emsg->element_type = htons (el->element_type);
+  GNUNET_memcpy (&emsg[1], el->data, el->size);
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
+}
+
+
+static void
+send_full_set (struct Operation *op)
+{
+  struct GNUNET_MQ_Envelope *ev;
+
+  op->state->phase = PHASE_FULL_SENDING;
+
+  (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
+                                                &send_element_iterator, op);
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+  GNUNET_MQ_send (op->mq, ev);
 }
 
 
 }
 
 
@@ -737,33 +814,127 @@ get_order_from_difference (unsigned int diff)
  *
  * @param cls the union operation
  * @param mh the message
  *
  * @param cls the union operation
  * @param mh the message
+ * @param is_compressed #GNUNET_YES if the estimator is compressed
+ * @return #GNUNET_SYSERR if the tunnel should be disconnected,
+ *         #GNUNET_OK otherwise
  */
  */
-static void
-handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
+static int
+handle_p2p_strata_estimator (void *cls,
+                             const struct GNUNET_MessageHeader *mh,
+                             int is_compressed)
 {
 {
-  struct OperationState *eo = cls;
+  struct Operation *op = cls;
   struct StrataEstimator *remote_se;
   struct StrataEstimator *remote_se;
-  int diff;
+  struct StrataEstimatorMessage *msg = (void *) mh;
+  unsigned int diff;
+  uint64_t other_size;
+  size_t len;
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# bytes of SE received",
+                            ntohs (mh->size),
+                            GNUNET_NO);
 
 
-  if (eo->phase != PHASE_EXPECT_SE)
+  if (op->state->phase != PHASE_EXPECT_SE)
   {
   {
-    fail_union_operation (eo);
     GNUNET_break (0);
     GNUNET_break (0);
-    return;
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
+  }
+  len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
+  if ( (GNUNET_NO == is_compressed) &&
+       (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
+  {
+    fail_union_operation (op);
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
   }
-  remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
+  other_size = GNUNET_ntohll (msg->set_size);
+  remote_se = strata_estimator_create (SE_STRATA_COUNT,
+                                       SE_IBF_SIZE,
                                        SE_IBF_HASH_NUM);
                                        SE_IBF_HASH_NUM);
-  strata_estimator_read (&mh[1], remote_se);
-  GNUNET_assert (NULL != eo->se);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, calculating diff\n");
-  diff = strata_estimator_difference (remote_se, eo->se);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "se diff=%d\n", diff);
+  if (NULL == remote_se)
+  {
+    /* insufficient resources, fail */
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
+  }
+  if (GNUNET_OK !=
+      strata_estimator_read (&msg[1],
+                             len,
+                             is_compressed,
+                             remote_se))
+  {
+    /* decompression failed */
+    fail_union_operation (op);
+    strata_estimator_destroy (remote_se);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_assert (NULL != op->state->se);
+  diff = strata_estimator_difference (remote_se,
+                                      op->state->se);
+
+  if (diff > 200)
+    diff = diff * 3 / 2; 
+
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (remote_se);
-  strata_estimator_destroy (eo->se);
-  eo->se = NULL;
-  send_ibf (eo, get_order_from_difference (diff));
-}
+  strata_estimator_destroy (op->state->se);
+  op->state->se = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "got se diff=%d, using ibf size %d\n",
+       diff,
+       1<<get_order_from_difference (diff));
+
+  if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
+  {
+    GNUNET_break (0);
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
+  }
+
+
+  if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
+  {
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Sending full set (diff=%d, own set=%u)\n",
+         diff,
+         op->state->initial_size);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of full sends",
+                              1,
+                              GNUNET_NO);
+    if (op->state->initial_size <= other_size)
+    {
+      send_full_set (op);
+    }
+    else
+    {
+      struct GNUNET_MQ_Envelope *ev;
+      op->state->phase = PHASE_EXPECT_IBF;
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+      GNUNET_MQ_send (op->mq, ev);
+    }
+  }
+  else
+  {
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of ibf sends",
+                              1,
+                              GNUNET_NO);
+    if (GNUNET_OK !=
+        send_ibf (op,
+                  get_order_from_difference (diff)))
+    {
+      /* Internal error, best we can do is shut the connection */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to send IBF, closing connection\n");
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+  }
 
 
+  return GNUNET_OK;
+}
 
 
 /**
 
 
 /**
@@ -774,92 +945,121 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
  * @param value the key entry
  */
 static int
  * @param value the key entry
  */
 static int
-send_element_iterator (void *cls,
-                       uint32_t key,
-                       void *value)
+send_offers_iterator (void *cls,
+                      uint32_t key,
+                      void *value)
 {
   struct SendElementClosure *sec = cls;
 {
   struct SendElementClosure *sec = cls;
-  struct IBF_Key ibf_key = sec->ibf_key;
-  struct OperationState *eo = sec->eo;
+  struct Operation *op = sec->op;
   struct KeyEntry *ke = value;
   struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_MessageHeader *mh;
 
 
-  if (ke->ibf_key.key_val != ibf_key.key_val)
+  /* Detect 32-bit key collision for the 64-bit IBF keys. */
+  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
     return GNUNET_YES;
     return GNUNET_YES;
-  while (NULL != ke)
-  {
-    const struct GNUNET_SET_Element *const element = &ke->element->element;
-    struct GNUNET_MQ_Envelope *ev;
-    struct GNUNET_MessageHeader *mh;
 
 
-    GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
-    ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
-    if (NULL == ev)
-    {
-      /* element too large */
-      GNUNET_break (0);
-      continue;
-    }
-    memcpy (&mh[1], element->data, element->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to peer\n");
-    GNUNET_MQ_send (eo->mq, ev);
-    ke = ke->next_colliding;
-  }
-  return GNUNET_NO;
+  ev = GNUNET_MQ_msg_header_extra (mh,
+                                   sizeof (struct GNUNET_HashCode),
+                                   GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
+
+  GNUNET_assert (NULL != ev);
+  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "[OP %x] sending element offer (%s) to peer\n",
+       (void *) op,
+       GNUNET_h2s (&ke->element->element_hash));
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
 }
 
 }
 
+
 /**
 /**
- * Send all elements that have the specified IBF key
- * to the remote peer of the union operation
+ * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
  *
  *
- * @param eo union operation
+ * @param op union operation
  * @param ibf_key IBF key of interest
  */
 static void
  * @param ibf_key IBF key of interest
  */
 static void
-send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
+send_offers_for_key (struct Operation *op,
+                     struct IBF_Key ibf_key)
 {
   struct SendElementClosure send_cls;
 
   send_cls.ibf_key = ibf_key;
 {
   struct SendElementClosure send_cls;
 
   send_cls.ibf_key = ibf_key;
-  send_cls.eo = eo;
-  GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val,
-                                                &send_element_iterator, &send_cls);
+  send_cls.op = op;
+  (void) GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
+                                                       (uint32_t) ibf_key.key_val,
+                                                       &send_offers_iterator,
+                                                       &send_cls);
 }
 
 
 /**
  * Decode which elements are missing on each side, and
 }
 
 
 /**
  * Decode which elements are missing on each side, and
- * send the appropriate elemens and requests
+ * send the appropriate offers and inquiries.
  *
  *
- * @param eo union operation
+ * @param op union operation
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
  */
-static void
-decode_and_send (struct OperationState *eo)
+static int
+decode_and_send (struct Operation *op)
 {
   struct IBF_Key key;
 {
   struct IBF_Key key;
+  struct IBF_Key last_key;
   int side;
   unsigned int num_decoded;
   struct InvertibleBloomFilter *diff_ibf;
 
   int side;
   unsigned int num_decoded;
   struct InvertibleBloomFilter *diff_ibf;
 
-  GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
+  GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
+
+  if (GNUNET_OK !=
+      prepare_ibf (op, op->state->remote_ibf->size))
+  {
+    GNUNET_break (0);
+    /* allocation failed */
+    return GNUNET_SYSERR;
+  }
+  diff_ibf = ibf_dup (op->state->local_ibf);
+  ibf_subtract (diff_ibf, op->state->remote_ibf);
+
+  ibf_destroy (op->state->remote_ibf);
+  op->state->remote_ibf = NULL;
 
 
-  prepare_ibf (eo, eo->remote_ibf->size);
-  diff_ibf = ibf_dup (eo->local_ibf);
-  ibf_subtract (diff_ibf, eo->remote_ibf);
-  
-  ibf_destroy (eo->remote_ibf);
-  eo->remote_ibf = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "decoding IBF (size=%u)\n",
+       diff_ibf->size);
 
   num_decoded = 0;
 
   num_decoded = 0;
+  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
 
   while (1)
   {
     int res;
 
   while (1)
   {
     int res;
+    int cycle_detected = GNUNET_NO;
+
+    last_key = key;
 
     res = ibf_decode (diff_ibf, &side, &key);
 
     res = ibf_decode (diff_ibf, &side, &key);
-    num_decoded += 1;
-    if (num_decoded > diff_ibf->size)
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf\n");
-    if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size))
+    if (res == GNUNET_OK)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "decoded ibf key %lx\n",
+           (unsigned long) key.key_val);
+      num_decoded += 1;
+      if ( (num_decoded > diff_ibf->size) ||
+           ( (num_decoded > 1) &&
+             (last_key.key_val == key.key_val) ) )
+      {
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "detected cyclic ibf (decoded %u/%u)\n",
+             num_decoded,
+             diff_ibf->size);
+        cycle_detected = GNUNET_YES;
+      }
+    }
+    if ( (GNUNET_SYSERR == res) ||
+         (GNUNET_YES == cycle_detected) )
     {
       int next_order;
       next_order = 0;
     {
       int next_order;
       next_order = 0;
@@ -868,15 +1068,37 @@ decode_and_send (struct OperationState *eo)
       next_order++;
       if (next_order <= MAX_IBF_ORDER)
       {
       next_order++;
       if (next_order <= MAX_IBF_ORDER)
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 
-                   "decoding failed, sending larger ibf (size %u)\n",
-                    1<<next_order);
-        send_ibf (eo, next_order);
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "decoding failed, sending larger ibf (size %u)\n",
+             1<<next_order);
+        GNUNET_STATISTICS_update (_GSS_statistics,
+                                  "# of IBF retries",
+                                  1,
+                                  GNUNET_NO);
+        op->state->salt_send++;
+        if (GNUNET_OK !=
+            send_ibf (op, next_order))
+        {
+          /* Internal error, best we can do is shut the connection */
+          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                      "Failed to send IBF, closing connection\n");
+          fail_union_operation (op);
+          ibf_destroy (diff_ibf);
+          return GNUNET_SYSERR;
+        }
       }
       else
       {
       }
       else
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 
-                   "set union failed: reached ibf limit\n");
+        GNUNET_STATISTICS_update (_GSS_statistics,
+                                  "# of failed union operations (too large)",
+                                  1,
+                                  GNUNET_NO);
+        // XXX: Send the whole set, element-by-element
+        LOG (GNUNET_ERROR_TYPE_ERROR,
+             "set union failed: reached ibf limit\n");
+        fail_union_operation (op);
+        ibf_destroy (diff_ibf);
+        return GNUNET_SYSERR;
       }
       break;
     }
       }
       break;
     }
@@ -884,69 +1106,126 @@ decode_and_send (struct OperationState *eo)
     {
       struct GNUNET_MQ_Envelope *ev;
 
     {
       struct GNUNET_MQ_Envelope *ev;
 
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
-      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-      GNUNET_MQ_send (eo->mq, ev);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "transmitted all values, sending DONE\n");
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+      GNUNET_MQ_send (op->mq, ev);
+      /* We now wait until we get a DONE message back
+       * and then wait for our MQ to be flushed and all our
+       * demands be delivered. */
       break;
     }
     if (1 == side)
     {
       break;
     }
     if (1 == side)
     {
-      send_elements_for_key (eo, key);
+      struct IBF_Key unsalted_key;
+      unsalt_key (&key, op->state->salt_receive, &unsalted_key);
+      send_offers_for_key (op, unsalted_key);
     }
     }
-    else
+    else if (-1 == side)
     {
       struct GNUNET_MQ_Envelope *ev;
     {
       struct GNUNET_MQ_Envelope *ev;
-      struct GNUNET_MessageHeader *msg;
-
-      /* FIXME: before sending the request, check if we may just have the element */
-      /* FIXME: merge multiple requests */
-      ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
-                                        GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
-      *(struct IBF_Key *) &msg[1] = key;
-      GNUNET_MQ_send (eo->mq, ev);
+      struct InquiryMessage *msg;
+
+      /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
+       * the effort additional complexity. */
+      ev = GNUNET_MQ_msg_extra (msg,
+                                sizeof (struct IBF_Key),
+                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
+      msg->salt = htonl (op->state->salt_receive);
+      GNUNET_memcpy (&msg[1],
+              &key,
+              sizeof (struct IBF_Key));
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "sending element inquiry for IBF key %lx\n",
+           (unsigned long) key.key_val);
+      GNUNET_MQ_send (op->mq, ev);
+    }
+    else
+    {
+      GNUNET_assert (0);
     }
   }
   ibf_destroy (diff_ibf);
     }
   }
   ibf_destroy (diff_ibf);
+  return GNUNET_OK;
 }
 
 
 /**
  * Handle an IBF message from a remote peer.
  *
 }
 
 
 /**
  * Handle an IBF message from a remote peer.
  *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
  * @param cls the union operation
  * @param mh the header of the message
  * @param cls the union operation
  * @param mh the header of the message
+ * @return #GNUNET_SYSERR if the tunnel should be disconnected,
+ *         #GNUNET_OK otherwise
  */
  */
-static void
-handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
+static int
+handle_p2p_ibf (void *cls,
+                const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct OperationState *eo = cls;
-  struct IBFMessage *msg = (struct IBFMessage *) mh;
+  struct Operation *op = cls;
+  const struct IBFMessage *msg;
   unsigned int buckets_in_message;
 
   unsigned int buckets_in_message;
 
-  if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
-       (eo->phase == PHASE_EXPECT_IBF) )
+  if (ntohs (mh->size) < sizeof (struct IBFMessage))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
+  }
+  msg = (const struct IBFMessage *) mh;
+  if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+       (op->state->phase == PHASE_EXPECT_IBF) )
   {
   {
-    eo->phase = PHASE_EXPECT_IBF_CONT;
-    GNUNET_assert (NULL == eo->remote_ibf);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
-    eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
-    eo->ibf_buckets_received = 0;
-    if (0 != ntohs (msg->offset))
+    op->state->phase = PHASE_EXPECT_IBF_CONT;
+    GNUNET_assert (NULL == op->state->remote_ibf);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Creating new ibf of size %u\n",
+         1 << msg->order);
+    op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    op->state->salt_receive = ntohl (msg->salt);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
+    if (NULL == op->state->remote_ibf)
     {
     {
-      GNUNET_break (0);
-      fail_union_operation (eo);
-      return;
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to parse remote IBF, closing connection\n");
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+    op->state->ibf_buckets_received = 0;
+    if (0 != ntohl (msg->offset))
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
     }
   }
     }
   }
-  else if (eo->phase == PHASE_EXPECT_IBF_CONT)
+  else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
   {
   {
-    if ( (ntohs (msg->offset) != eo->ibf_buckets_received) ||
-         (1<<msg->order != eo->remote_ibf->size) )
+    if (ntohl (msg->offset) != op->state->ibf_buckets_received)
     {
     {
-      GNUNET_break (0);
-      fail_union_operation (eo);
-      return;
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
     }
     }
+    if (1<<msg->order != op->state->remote_ibf->size)
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+    if (ntohl (msg->salt) != op->state->salt_receive)
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+  }
+  else
+  {
+    GNUNET_assert (0);
   }
 
   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
   }
 
   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
@@ -954,27 +1233,40 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
   if (0 == buckets_in_message)
   {
     GNUNET_break_op (0);
   if (0 == buckets_in_message)
   {
     GNUNET_break_op (0);
-    fail_union_operation (eo);
-    return;
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
   }
 
   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
   {
   }
 
   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
   {
-    GNUNET_break (0);
-    fail_union_operation (eo);
-    return;
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return GNUNET_SYSERR;
   }
   }
-  
-  ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf);
-  eo->ibf_buckets_received += buckets_in_message;
 
 
-  if (eo->ibf_buckets_received == eo->remote_ibf->size)
-  {
+  GNUNET_assert (NULL != op->state->remote_ibf);
+
+  ibf_read_slice (&msg[1],
+                  op->state->ibf_buckets_received,
+                  buckets_in_message,
+                  op->state->remote_ibf);
+  op->state->ibf_buckets_received += buckets_in_message;
 
 
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
-    eo->phase = PHASE_EXPECT_ELEMENTS;
-    decode_and_send (eo);
+  if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "received full ibf\n");
+    op->state->phase = PHASE_INVENTORY_ACTIVE;
+    if (GNUNET_OK !=
+        decode_and_send (op))
+    {
+      /* Internal error, best we can do is shut down */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to decode IBF, closing connection\n");
+      return GNUNET_SYSERR;
+    }
   }
   }
+  return GNUNET_OK;
 }
 
 
 }
 
 
@@ -982,18 +1274,22 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
  * Send a result message to the client indicating
  * that there is a new element.
  *
  * Send a result message to the client indicating
  * that there is a new element.
  *
- * @param eo union operation
+ * @param op union operation
  * @param element element to send
  * @param element element to send
+ * @param status status to send with the new element
  */
 static void
  */
 static void
-send_client_element (struct OperationState *eo,
-                     struct GNUNET_SET_Element *element)
+send_client_element (struct Operation *op,
+                     struct GNUNET_SET_Element *element,
+                     int status)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", element->size);
-  GNUNET_assert (0 != eo->spec->client_request_id);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "sending element (size %u) to client\n",
+       element->size);
+  GNUNET_assert (0 != op->spec->client_request_id);
   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
   if (NULL == ev)
   {
   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
   if (NULL == ev)
   {
@@ -1001,254 +1297,764 @@ send_client_element (struct OperationState *eo,
     GNUNET_break (0);
     return;
   }
     GNUNET_break (0);
     return;
   }
-  rm->result_status = htons (GNUNET_SET_STATUS_OK);
-  rm->request_id = htonl (eo->spec->client_request_id);
-  rm->element_type = element->type;
-  memcpy (&rm[1], element->data, element->size);
-  GNUNET_MQ_send (eo->spec->set->client_mq, ev);
+  rm->result_status = htons (status);
+  rm->request_id = htonl (op->spec->client_request_id);
+  rm->element_type = htons (element->element_type);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
+  GNUNET_memcpy (&rm[1], element->data, element->size);
+  GNUNET_MQ_send (op->spec->set->client_mq, ev);
 }
 
 
 /**
 }
 
 
 /**
- * Send a result message to the client indicating
- * that the operation is over.
- * After the result done message has been sent to the client,
- * destroy the evaluate operation.
+ * Signal to the client that the operation has finished and
+ * destroy the operation.
  *
  *
- * @param eo union operation
+ * @param cls operation to destroy
  */
 static void
  */
 static void
-send_client_done_and_destroy (struct OperationState *eo)
+send_done_and_destroy (void *cls)
 {
 {
+  struct Operation *op = cls;
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
-  rm->request_id = htonl (eo->spec->client_request_id);
+  rm->request_id = htonl (op->spec->client_request_id);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   rm->element_type = htons (0);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   rm->element_type = htons (0);
-  GNUNET_MQ_send (eo->spec->set->client_mq, ev);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
+  GNUNET_MQ_send (op->spec->set->client_mq, ev);
+  /* Will also call the union-specific cancel function. */
+  _GSS_operation_destroy (op, GNUNET_YES);
+}
 
 
+
+static void
+maybe_finish (struct Operation *op)
+{
+  unsigned int num_demanded;
+
+  num_demanded = GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes);
+
+  if (PHASE_FINISH_WAITING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "In PHASE_FINISH_WAITING, pending %u demands\n",
+         num_demanded);
+    if (0 == num_demanded)
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      op->state->phase = PHASE_DONE;
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+      GNUNET_MQ_send (op->mq, ev);
+
+      /* We now wait until the other peer closes the channel
+       * after it got all elements from us. */
+    }
+  }
+  if (PHASE_FINISH_CLOSING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "In PHASE_FINISH_CLOSING, pending %u demands\n",
+         num_demanded);
+    if (0 == num_demanded)
+    {
+      op->state->phase = PHASE_DONE;
+      send_done_and_destroy (op);
+    }
+  }
 }
 
 
 /**
  * Handle an element message from a remote peer.
 }
 
 
 /**
  * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
  *
  * @param cls the union operation
  * @param mh the message
  */
 static void
  *
  * @param cls the union operation
  * @param mh the message
  */
 static void
-handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_elements (void *cls,
+                     const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct OperationState *eo = cls;
+  struct Operation *op = cls;
   struct ElementEntry *ee;
   struct ElementEntry *ee;
+  const struct GNUNET_SET_ElementMessage *emsg;
   uint16_t element_size;
 
   uint16_t element_size;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
-
-  if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
-       (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
+  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
   {
   {
-    fail_union_operation (eo);
-    GNUNET_break (0);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
     return;
   }
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
-  ee = GNUNET_malloc (sizeof *eo + element_size);
-  memcpy (&ee[1], &mh[1], element_size);
+
+  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
   ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->element.size = element_size;
   ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
   ee->remote = GNUNET_YES;
   ee->remote = GNUNET_YES;
+  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
 
 
-  insert_element (eo, ee);
-  send_client_element (eo, &ee->element);
+  if (GNUNET_NO ==
+      GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
+                                            &ee->element_hash,
+                                            NULL))
+  {
+    /* We got something we didn't demand, since it's not in our map. */
+    GNUNET_break_op (0);
+    GNUNET_free (ee);
+    fail_union_operation (op);
+    return;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total += 1;
+
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+
+  if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  maybe_finish (op);
 }
 
 
 /**
 }
 
 
 /**
- * Handle an element request from a remote peer.
+ * Handle an element message from a remote peer.
  *
  * @param cls the union operation
  * @param mh the message
  */
 static void
  *
  * @param cls the union operation
  * @param mh the message
  */
 static void
-handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_full_element (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct OperationState *eo = cls;
-  struct IBF_Key *ibf_key;
-  unsigned int num_keys;
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  const struct GNUNET_SET_ElementMessage *emsg;
+  uint16_t element_size;
 
 
-  /* look up elements and send them */
-  if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
   {
   {
-    GNUNET_break (0);
-    fail_union_operation (eo);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
 
     return;
   }
 
-  num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key);
+  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  ee->element.size = element_size;
+  ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
+  ee->remote = GNUNET_YES;
+  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (full diff, size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total += 1;
 
 
-  if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
   {
   {
-    GNUNET_break (0);
-    fail_union_operation (eo);
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+
+  if ( (GNUNET_YES == op->spec->byzantine) && 
+       (op->state->received_total > 384 + op->state->received_fresh * 4) && 
+       (op->state->received_fresh < op->state->received_total / 6) )
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+         (unsigned long long) op->state->received_fresh,
+         (unsigned long long) op->state->received_total);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
     return;
   }
+}
+
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_inquiry (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  const struct IBF_Key *ibf_key;
+  unsigned int num_keys;
+  struct InquiryMessage *msg;
 
 
-  ibf_key = (struct IBF_Key *) &mh[1];
+  /* look up elements and send them */
+  if (op->state->phase != PHASE_INVENTORY_PASSIVE)
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
+      / sizeof (struct IBF_Key);
+  if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
+      != num_keys * sizeof (struct IBF_Key))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  msg = (struct InquiryMessage *) mh;
+
+  ibf_key = (const struct IBF_Key *) &msg[1];
   while (0 != num_keys--)
   {
   while (0 != num_keys--)
   {
-    send_elements_for_key (eo, *ibf_key);
+    struct IBF_Key unsalted_key;
+    unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
+    send_offers_for_key (op, unsalted_key);
     ibf_key++;
   }
 }
 
 
 /**
     ibf_key++;
   }
 }
 
 
 /**
- * Callback used for notifications
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
  *
  * @param cls closure
  *
  * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+                            uint32_t key,
+                            void *value)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct ElementEntry *ee = ke->element;
+
+  if (GNUNET_YES == ke->received)
+    return GNUNET_YES;
+
+  ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+  emsg->reserved = htons (0);
+  emsg->element_type = htons (ee->element.element_type);
+  GNUNET_MQ_send (op->mq, ev);
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Handle a 
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
  */
 static void
  */
 static void
-peer_done_sent_cb (void *cls)
+handle_p2p_request_full (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct OperationState *eo = cls;
+  struct Operation *op = cls;
 
 
-  send_client_done_and_destroy (eo);
+  if (PHASE_EXPECT_IBF != op->state->phase)
+  {
+    fail_union_operation (op);
+    GNUNET_break_op (0);
+    return;
+  }
+
+  // FIXME: we need to check that our set is larger than the
+  // byzantine_lower_bound by some threshold
+  send_full_set (op);
 }
 
 
 /**
 }
 
 
 /**
- * Handle a done message from a remote peer
- * 
- * @param cls the union operation
- * @param mh the message
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
  */
 static void
  */
 static void
-handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_p2p_full_done (void *cls,
+                      const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct OperationState *eo = cls;
+  struct Operation *op = cls;
 
 
-  if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  if (PHASE_EXPECT_IBF == op->state->phase)
   {
   {
-    /* we got all requests, but still have to send our elements as response */
     struct GNUNET_MQ_Envelope *ev;
 
     struct GNUNET_MQ_Envelope *ev;
 
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
-    eo->phase = PHASE_FINISHED;
-    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
-    GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
-    GNUNET_MQ_send (eo->mq, ev);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+
+    /* send all the elements that did not come from the remote peer */
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                             &send_missing_elements_iter,
+                                             op);
+
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+    GNUNET_MQ_send (op->mq, ev);
+    op->state->phase = PHASE_DONE;
+
+    /* we now wait until the other peer shuts the tunnel down*/
+  }
+  else if (PHASE_FULL_SENDING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+    /* We sent the full set, and got the response for that.  We're done. */
+    op->state->phase = PHASE_DONE;
+    send_done_and_destroy (op);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
+
+/**
+ * Handle a demand by the other peer for elements based on a list
+ * of GNUNET_HashCode-s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_demand (void *cls,
+                   const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct GNUNET_SET_ElementMessage *emsg;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+  struct GNUNET_MQ_Envelope *ev;
+
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+      != num_hashes * sizeof (struct GNUNET_HashCode))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  for (hash = (const struct GNUNET_HashCode *) &mh[1];
+       num_hashes > 0;
+       hash++, num_hashes--)
+  {
+    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
+    if (NULL == ee)
+    {
+      /* Demand for non-existing element. */
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+    if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+    {
+      /* Probably confused lazily copied sets. */
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+    ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+    GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+    emsg->reserved = htons (0);
+    emsg->element_type = htons (ee->element.element_type);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
+         (void *) op,
+         (unsigned int) ee->element.size,
+         GNUNET_h2s (&ee->element_hash));
+    GNUNET_MQ_send (op->mq, ev);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# exchanged elements",
+                              1,
+                              GNUNET_NO);
+
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        /* Nothing to do. */
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+}
+
+
+/**
+ * Handle offers (of GNUNET_HashCode-s) and
+ * respond with demands (of GNUNET_HashCode-s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_offer (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+
+  /* look up elements and send them */
+  if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+       (op->state->phase != PHASE_INVENTORY_ACTIVE))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
     return;
   }
-  if (eo->phase == PHASE_EXPECT_ELEMENTS)
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+      != num_hashes * sizeof (struct GNUNET_HashCode))
   {
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
-    eo->phase = PHASE_FINISHED;
-    send_client_done_and_destroy (eo);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
     return;
   }
-  GNUNET_break (0);
-  fail_union_operation (eo);
+
+  for (hash = (const struct GNUNET_HashCode *) &mh[1];
+       num_hashes > 0;
+       hash++, num_hashes--)
+  {
+    struct ElementEntry *ee;
+    struct GNUNET_MessageHeader *demands;
+    struct GNUNET_MQ_Envelope *ev;
+
+    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
+                                            hash);
+    if (NULL != ee)
+      if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
+        continue;
+
+    if (GNUNET_YES ==
+        GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
+                                                hash))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Skipped sending duplicate demand\n");
+      continue;
+    }
+
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
+                                                      hash,
+                                                      NULL,
+                                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "[OP %x] Requesting element (hash %s)\n",
+         (void *) op, GNUNET_h2s (hash));
+    ev = GNUNET_MQ_msg_header_extra (demands,
+                                     sizeof (struct GNUNET_HashCode),
+                                     GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
+    *(struct GNUNET_HashCode *) &demands[1] = *hash;
+    GNUNET_MQ_send (op->mq, ev);
+  }
 }
 
 
 /**
 }
 
 
 /**
- * Evaluate a union operation with
- * a remote peer.
+ * Handle a done message from a remote peer
  *
  *
- * @param spec specification of the operation the evaluate
- * @param tunnel tunnel already connected to the partner peer
- * @param tc tunnel context, passed here so all new incoming
- *        messages are directly going to the union operations
- * @return a handle to the operation
+ * @param cls the union operation
+ * @param mh the message
  */
 static void
  */
 static void
-union_evaluate (struct OperationSpecification *spec,
-                struct GNUNET_MESH_Tunnel *tunnel,
-                struct TunnelContext *tc)
+handle_p2p_done (void *cls,
+                 const struct GNUNET_MessageHeader *mh)
 {
 {
-  struct OperationState *eo;
+  struct Operation *op = cls;
+
+  if (op->state->phase == PHASE_INVENTORY_PASSIVE)
+  {
+    /* We got all requests, but still have to send our elements in response. */
+
+    op->state->phase = PHASE_FINISH_WAITING;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "got DONE (as passive partner), waiting for our demands to be satisfied\n");
+    /* The active peer is done sending offers
+     * and inquiries.  This means that all
+     * our responses to that (demands and offers)
+     * must be in flight (queued or in mesh).
+     *
+     * We should notify the active peer once
+     * all our demands are satisfied, so that the active
+     * peer can quit if we gave him everything.
+     */
+    maybe_finish (op);
+    return;
+  }
+  if (op->state->phase == PHASE_INVENTORY_ACTIVE)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "got DONE (as active partner), waiting to finish\n");
+    /* All demands of the other peer are satisfied,
+     * and we processed all offers, thus we know
+     * exactly what our demands must be.
+     *
+     * We'll close the channel
+     * to the other peer once our demands are met.
+     */
+    op->state->phase = PHASE_FINISH_CLOSING;
+    maybe_finish (op);
+    return;
+  }
+  GNUNET_break_op (0);
+  fail_union_operation (op);
+}
 
 
-  eo = GNUNET_new (struct OperationState);
-  tc->vt = _GSS_union_vt ();
-  tc->op = eo;
-  eo->se = strata_estimator_dup (spec->set->state->se);
-  eo->set_state = spec->set->state;
-  eo->spec = spec;
-  eo->tunnel = tunnel;
-  eo->mq = GNUNET_MESH_mq_create (tunnel);
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
-             "evaluating union operation, (app %s)\n", 
-              GNUNET_h2s (&eo->spec->app_id));
+/**
+ * Initiate operation to evaluate a set union with a remote peer.
+ *
+ * @param op operation to perform (to be initialized)
+ * @param opaque_context message to be transmitted to the listener
+ *        to convince him to accept, may be NULL
+ */
+static void
+union_evaluate (struct Operation *op,
+                const struct GNUNET_MessageHeader *opaque_context)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct OperationRequestMessage *msg;
 
 
+  GNUNET_assert (NULL == op->state);
+  op->state = GNUNET_new (struct OperationState);
+  op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
+  /* copy the current generation's strata estimator for this operation */
+  op->state->se = strata_estimator_dup (op->spec->set->state->se);
   /* we started the operation, thus we have to send the operation request */
   /* we started the operation, thus we have to send the operation request */
-  eo->phase = PHASE_EXPECT_SE;
+  op->state->phase = PHASE_EXPECT_SE;
+  op->state->salt_receive = op->state->salt_send = 42;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Initiating union operation evaluation\n");
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of total union operations",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of initiated union operations",
+                            1,
+                            GNUNET_NO);
+  ev = GNUNET_MQ_msg_nested_mh (msg,
+                                GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+                                opaque_context);
+  if (NULL == ev)
+  {
+    /* the context message is too large */
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (op->spec->set->client);
+    return;
+  }
+  msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
+  GNUNET_MQ_send (op->mq,
+                  ev);
 
 
-  GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
-                               eo->set_state->ops_tail,
-                               eo);
+  if (NULL != opaque_context)
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "sent op request with context message\n");
+  else
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "sent op request without context message\n");
 
 
-  send_operation_request (eo);
+  initialize_key_to_element (op);
+  op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
 }
 
 
 /**
 }
 
 
 /**
- * Accept an union operation request from a remote peer
+ * Accept an union operation request from a remote peer.
+ * Only initializes the private operation state.
  *
  *
- * @param spec all necessary information about the operation
- * @param tunnel open tunnel to the partner's peer
- * @param tc tunnel context, passed here so all new incoming
- *        messages are directly going to the union operations
- * @return operation
+ * @param op operation that will be accepted as a union operation
  */
 static void
  */
 static void
-union_accept (struct OperationSpecification *spec,
-              struct GNUNET_MESH_Tunnel *tunnel,
-              struct TunnelContext *tc)
+union_accept (struct Operation *op)
 {
 {
-  struct OperationState *eo;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
-
-  eo = GNUNET_new (struct OperationState);
-  tc->vt = _GSS_union_vt ();
-  tc->op = eo;
-  eo->set_state = spec->set->state;
-  eo->generation_created = eo->set_state->current_generation++;
-  eo->spec = spec;
-  eo->tunnel = tunnel;
-  eo->mq = GNUNET_MESH_mq_create (tunnel);
-  eo->se = strata_estimator_dup (eo->set_state->se);
-  /* transfer ownership of mq and socket from incoming to eo */
-  GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
-                               eo->set_state->ops_tail,
-                               eo);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "accepting set union operation\n");
+  GNUNET_assert (NULL == op->state);
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of accepted union operations",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of total union operations",
+                            1,
+                            GNUNET_NO);
+
+  op->state = GNUNET_new (struct OperationState);
+  op->state->se = strata_estimator_dup (op->spec->set->state->se);
+  op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
+  op->state->salt_receive = op->state->salt_send = 42;
+  initialize_key_to_element (op);
+  op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
   /* kick off the operation */
   /* kick off the operation */
-  send_strata_estimator (eo);
+  send_strata_estimator (op);
 }
 
 
 /**
  * Create a new set supporting the union operation
  *
 }
 
 
 /**
  * Create a new set supporting the union operation
  *
- * @return the newly created set
+ * We maintain one strata estimator per set and then manipulate it over the
+ * lifetime of the set, as recreating a strata estimator would be expensive.
+ *
+ * @return the newly created set, NULL on error
  */
 static struct SetState *
 union_set_create (void)
 {
   struct SetState *set_state;
 
  */
 static struct SetState *
 union_set_create (void)
 {
   struct SetState *set_state;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
-  
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "union set created\n");
   set_state = GNUNET_new (struct SetState);
   set_state = GNUNET_new (struct SetState);
-  /* keys of the hash map are stored in the element entrys, thus we do not
-   * want the hash map to copy them */
-  set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
-                                              SE_IBF_SIZE, SE_IBF_HASH_NUM);  
+                                           SE_IBF_SIZE, SE_IBF_HASH_NUM);
+  if (NULL == set_state->se)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to allocate strata estimator\n");
+    GNUNET_free (set_state);
+    return NULL;
+  }
   return set_state;
 }
 
   return set_state;
 }
 
@@ -1257,170 +2063,165 @@ union_set_create (void)
  * Add the element from the given element message to the set.
  *
  * @param set_state state of the set want to add to
  * Add the element from the given element message to the set.
  *
  * @param set_state state of the set want to add to
- * @param element the element to add to the set
+ * @param ee the element to add to the set
  */
 static void
  */
 static void
-union_add (struct SetState *set_state, const struct GNUNET_SET_Element *element)
+union_add (struct SetState *set_state, struct ElementEntry *ee)
 {
 {
-  struct ElementEntry *ee;
-  struct ElementEntry *ee_dup;
+  strata_estimator_insert (set_state->se,
+                           get_ibf_key (&ee->element_hash));
+}
 
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", element->size);
 
 
-  ee = GNUNET_malloc (element->size + sizeof *ee);
-  ee->element.size = element->size;
-  memcpy (&ee[1], element->data, element->size);
-  ee->element.data = &ee[1];
-  ee->generation_added = set_state->current_generation;
-  ee->remote = GNUNET_NO;
-  GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
-  ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
-                                              &ee->element_hash);
-  if (NULL != ee_dup)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
-    GNUNET_free (ee);
-    return;
-  }
-  GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, ee,
-                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
+/**
+ * Remove the element given in the element message from the set.
+ * Only marks the element as removed, so that older set operations can still exchange it.
+ *
+ * @param set_state state of the set to remove from
+ * @param ee set element to remove
+ */
+static void
+union_remove (struct SetState *set_state, struct ElementEntry *ee)
+{
+  strata_estimator_remove (set_state->se,
+                           get_ibf_key (&ee->element_hash));
 }
 
 
 /**
 }
 
 
 /**
- * Destroy a set that supports the union operation
+ * Destroy a set that supports the union operation.
  *
  * @param set_state the set to destroy
  */
 static void
 union_set_destroy (struct SetState *set_state)
 {
  *
  * @param set_state the set to destroy
  */
 static void
 union_set_destroy (struct SetState *set_state)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
-  /* important to destroy operations before the rest of the set */
-  while (NULL != set_state->ops_head)
-    union_operation_destroy (set_state->ops_head);
   if (NULL != set_state->se)
   {
     strata_estimator_destroy (set_state->se);
     set_state->se = NULL;
   }
   if (NULL != set_state->se)
   {
     strata_estimator_destroy (set_state->se);
     set_state->se = NULL;
   }
-  if (NULL != set_state->elements)
-  {
-    GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
-                                           destroy_elements_iterator, NULL);
-    GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
-    set_state->elements = NULL;
-  }
-
   GNUNET_free (set_state);
 }
 
   GNUNET_free (set_state);
 }
 
-/**
- * Remove the element given in the element message from the set.
- * Only marks the element as removed, so that older set operations can still exchange it.
- *
- * @param set_state state of the set to remove from
- * @param element set element to remove
- */
-static void
-union_remove (struct SetState *set_state, const struct GNUNET_SET_Element *element)
-{
-  struct GNUNET_HashCode hash;
-  struct ElementEntry *ee;
-
-  GNUNET_CRYPTO_hash (element->data, element->size, &hash);
-  ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
-  if (NULL == ee)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
-    return;
-  }
-  if (GNUNET_YES == ee->removed)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
-    return;
-  }
-  ee->removed = GNUNET_YES;
-  ee->generation_removed = set_state->current_generation;
-}
-
 
 /**
  * Dispatch messages for a union operation.
  *
 
 /**
  * Dispatch messages for a union operation.
  *
- * @param eo the state of the union evaluate operation
+ * @param op the state of the union evaluate operation
  * @param mh the received message
  * @param mh the received message
- * @return GNUNET_SYSERR if the tunnel should be disconnected,
- *         GNUNET_OK otherwise
+ * @return #GNUNET_SYSERR if the tunnel should be disconnected,
+ *         #GNUNET_OK otherwise
  */
 int
  */
 int
-union_handle_p2p_message (struct OperationState *eo,
+union_handle_p2p_message (struct Operation *op,
                           const struct GNUNET_MessageHeader *mh)
 {
                           const struct GNUNET_MessageHeader *mh)
 {
+  //LOG (GNUNET_ERROR_TYPE_DEBUG,
+  //            "received p2p message (t: %u, s: %u)\n",
+  //            ntohs (mh->type),
+  //            ntohs (mh->size));
   switch (ntohs (mh->type))
   {
   switch (ntohs (mh->type))
   {
-    case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
-      handle_p2p_ibf (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
+      return handle_p2p_ibf (op, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
+      return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
+      return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
+    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
+      handle_p2p_elements (op, mh);
       break;
       break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
-      handle_p2p_strata_estimator (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
+      handle_p2p_full_element (op, mh);
       break;
       break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
-      handle_p2p_elements (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
+      handle_p2p_inquiry (op, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
+      handle_p2p_done (op, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
+      handle_p2p_offer (op, mh);
       break;
       break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
-      handle_p2p_element_requests (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
+      handle_p2p_demand (op, mh);
       break;
       break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
-      handle_p2p_done (eo, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
+      handle_p2p_full_done (op, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
+      handle_p2p_request_full (op, mh);
       break;
     default:
       break;
     default:
-      /* something wrong with mesh's message handlers? */
+      /* Something wrong with cadet's message handlers? */
       GNUNET_assert (0);
   }
   return GNUNET_OK;
 }
 
 
       GNUNET_assert (0);
   }
   return GNUNET_OK;
 }
 
 
+/**
+ * Handler for peer-disconnects, notifies the client
+ * about the aborted operation in case the op was not concluded.
+ *
+ * @param op the destroyed operation
+ */
 static void
 static void
-union_peer_disconnect (struct OperationState *op)
+union_peer_disconnect (struct Operation *op)
 {
 {
-  /* Are we already disconnected? */
-  if (NULL == op->tunnel)
-    return;
-  op->tunnel = NULL;
-  if (NULL != op->mq)
-  {
-    GNUNET_MQ_destroy (op->mq);
-    op->mq = NULL;
-  }
-  if (PHASE_FINISHED != op->phase)
+  if (PHASE_DONE != op->state->phase)
   {
     struct GNUNET_MQ_Envelope *ev;
     struct GNUNET_SET_ResultMessage *msg;
   {
     struct GNUNET_MQ_Envelope *ev;
     struct GNUNET_SET_ResultMessage *msg;
-    ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+
+    ev = GNUNET_MQ_msg (msg,
+                        GNUNET_MESSAGE_TYPE_SET_RESULT);
     msg->request_id = htonl (op->spec->client_request_id);
     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
     msg->element_type = htons (0);
     msg->request_id = htonl (op->spec->client_request_id);
     msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
     msg->element_type = htons (0);
-    GNUNET_MQ_send (op->spec->set->client_mq, ev);
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
-  }
-  else
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
+    GNUNET_MQ_send (op->spec->set->client_mq,
+                    ev);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "other peer disconnected prematurely, phase %u\n",
+         op->state->phase);
+    _GSS_operation_destroy (op,
+                            GNUNET_YES);
+    return;
   }
   }
-  union_operation_destroy (op);
+  // else: the session has already been concluded
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "other peer disconnected (finished)\n");
+  if (GNUNET_NO == op->state->client_done_sent)
+    send_done_and_destroy (op);
 }
 
 
 }
 
 
-static void
-union_op_cancel (struct SetState *set_state, uint32_t op_id)
+/**
+ * Copy union-specific set state.
+ *
+ * @param set source set for copying the union state
+ * @return a copy of the union-specific set state
+ */
+static struct SetState *
+union_copy_state (struct Set *set)
 {
 {
-  /* FIXME: implement */
+  struct SetState *new_state;
+
+  new_state = GNUNET_new (struct SetState);
+  GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) );
+  new_state->se = strata_estimator_dup (set->state->se);
+
+  return new_state;
 }
 
 
 }
 
 
+/**
+ * Get the table with implementing functions for
+ * set union.
+ *
+ * @return the operation specific VTable
+ */
 const struct SetVT *
 _GSS_union_vt ()
 {
 const struct SetVT *
 _GSS_union_vt ()
 {
@@ -1433,7 +2234,8 @@ _GSS_union_vt ()
     .evaluate = &union_evaluate,
     .accept = &union_accept,
     .peer_disconnect = &union_peer_disconnect,
     .evaluate = &union_evaluate,
     .accept = &union_accept,
     .peer_disconnect = &union_peer_disconnect,
-    .cancel = &union_op_cancel
+    .cancel = &union_op_cancel,
+    .copy_state = &union_copy_state,
   };
 
   return &union_vt;
   };
 
   return &union_vt;