set: destroy client mq properly
[oweals/gnunet.git] / src / set / gnunet-service-set_union.c
index 78975749a5c337851df4f133b07e952b2af3cd5d..f46713c3102d25b403b3aea2ae369d6915a0e9d0 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013-2015 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013-2016 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -19,6 +19,7 @@
 */
 /**
  * @file set/gnunet-service-set_union.c
+
  * @brief two-peer set operations
  * @author Florian Dold
  */
@@ -60,7 +61,7 @@
  * Choose this value so that computing the IBF is still cheaper
  * than transmitting all values.
  */
-#define MAX_IBF_ORDER (16)
+#define MAX_IBF_ORDER (20)
 
 /**
  * Number of buckets used in the ibf per estimated
@@ -84,6 +85,7 @@ enum UnionOperationPhase
    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
    *
    * XXX: could use better wording.
+   * XXX: repurposed to also expect a "request full set" message, should be renamed
    *
    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
    */
@@ -114,14 +116,22 @@ enum UnionOperationPhase
    * In the penultimate phase,
    * we wait until all our demands
    * are satisfied.  Then we send a done
-   * message, and wait for another done message.*/
+   * message, and wait for another done message.
+   */
   PHASE_FINISH_WAITING,
 
   /**
    * In the ultimate phase, we wait until
    * our demands are satisfied and then
-   * quit (sending another DONE message). */
-  PHASE_DONE
+   * quit (sending another DONE message).
+   */
+  PHASE_DONE,
+
+  /**
+   * After sending the full set, wait for responses with the elements
+   * that the local peer is missing.
+   */
+  PHASE_FULL_SENDING,
 };
 
 
@@ -147,7 +157,7 @@ struct OperationState
   struct InvertibleBloomFilter *local_ibf;
 
   /**
-   * Maps IBF-Keys (specific to the current salt) to elements.
+   * Maps unsalted IBF-Keys to elements.
    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
    * Colliding IBF-Keys are linked.
    */
@@ -172,6 +182,33 @@ struct OperationState
    * Hashes for elements that we have demanded from the other peer.
    */
   struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
+
+  /**
+   * Salt that we're using for sending IBFs
+   */
+  uint32_t salt_send;
+
+  /**
+   * Salt for the IBF we've received and that we're currently decoding.
+   */
+  uint32_t salt_receive;
+
+  /**
+   * Number of elements we received from the other peer
+   * that were not in the local set yet.
+   */
+  uint32_t received_fresh;
+
+  /**
+   * Total number of elements received from the other peer.
+   */
+  uint32_t received_total;
+
+  /**
+   * Initial size of our set, just before
+   * the operation started.
+   */
+  uint64_t initial_size;
 };
 
 
@@ -192,6 +229,14 @@ struct KeyEntry
    * is #GNUNET_YES.
    */
   struct ElementEntry *element;
+
+  /**
+   * Did we receive this element?
+   * Even if element->is_foreign is false, we might
+   * have received the element, so this indicates that
+   * the other peer has it.
+   */
+  int received;
 };
 
 
@@ -334,18 +379,13 @@ fail_union_operation (struct Operation *op)
  * a salt.
  *
  * @param src the hash code
- * @param salt salt to use
  * @return the derived IBF key
  */
 static struct IBF_Key
-get_ibf_key (const struct GNUNET_HashCode *src,
-             uint16_t salt)
+get_ibf_key (const struct GNUNET_HashCode *src)
 {
   struct IBF_Key key;
-
-  /* FIXME: Ensure that the salt is handled correctly.
-     This is a quick fix so that consensus works for now. */
-  salt = 0;
+  uint16_t salt = 0;
 
   GNUNET_CRYPTO_kdf (&key, sizeof (key),
                      src, sizeof *src,
@@ -355,6 +395,16 @@ get_ibf_key (const struct GNUNET_HashCode *src,
 }
 
 
+/**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+  struct GNUNET_HashCode hash;
+  struct KeyEntry *k;
+};
+
+
 /**
  * Iterator over the mapping from IBF keys to element entries.  Checks if we
  * have an element with a given GNUNET_HashCode.
@@ -366,17 +416,20 @@ get_ibf_key (const struct GNUNET_HashCode *src,
  *         #GNUNET_NO if we've found the element.
  */
 static int
-op_has_element_iterator (void *cls,
+op_get_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
 {
-  struct GNUNET_HashCode *element_hash = cls;
+  struct GetElementContext *ctx = cls;
   struct KeyEntry *k = value;
 
   GNUNET_assert (NULL != k);
   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
-                                   element_hash))
+                                   &ctx->hash))
+  {
+    ctx->k = k;
     return GNUNET_NO;
+  }
   return GNUNET_YES;
 }
 
@@ -389,23 +442,29 @@ op_has_element_iterator (void *cls,
  * @param element_hash hash of the element to look for
  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
  */
-static int
-op_has_element (struct Operation *op,
+static struct KeyEntry *
+op_get_element (struct Operation *op,
                 const struct GNUNET_HashCode *element_hash)
 {
   int ret;
   struct IBF_Key ibf_key;
+  struct GetElementContext ctx = {{{ 0 }} , 0};
 
-  ibf_key = get_ibf_key (element_hash, op->spec->salt);
+  ctx.hash = *element_hash;
+
+  ibf_key = get_ibf_key (element_hash);
   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
                                                       (uint32_t) ibf_key.key_val,
-                                                      op_has_element_iterator,
-                                                      (void *) element_hash);
+                                                      op_get_element_iterator,
+                                                      &ctx);
 
   /* was the iteration aborted because we found the element? */
   if (GNUNET_SYSERR == ret)
-    return GNUNET_YES;
-  return GNUNET_NO;
+  {
+    GNUNET_assert (NULL != ctx.k);
+    return ctx.k;
+  }
+  return NULL;
 }
 
 
@@ -421,18 +480,21 @@ op_has_element (struct Operation *op,
  *
  * @param op the union operation
  * @param ee the element entry
+ * @parem received was this element received from the remote peer?
  */
 static void
 op_register_element (struct Operation *op,
-                     struct ElementEntry *ee)
+                     struct ElementEntry *ee,
+                     int received)
 {
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
 
-  ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
+  ibf_key = get_ibf_key (&ee->element_hash);
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
+  k->received = received;
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
                                                       (uint32_t) ibf_key.key_val,
@@ -441,6 +503,31 @@ op_register_element (struct Operation *op,
 }
 
 
+static void
+salt_key (const struct IBF_Key *k_in,
+          uint32_t salt,
+          struct IBF_Key *k_out)
+{
+  int s = salt % 64;
+  uint64_t x = k_in->key_val;
+  /* rotate ibf key */
+  x = (x >> s) | (x << (64 - s));
+  k_out->key_val = x;
+}
+
+
+static void
+unsalt_key (const struct IBF_Key *k_in,
+            uint32_t salt,
+            struct IBF_Key *k_out)
+{
+  int s = salt % 64;
+  uint64_t x = k_in->key_val;
+  x = (x << s) | (x >> (64 - s));
+  k_out->key_val = x;
+}
+
+
 /**
  * Insert a key into an ibf.
  *
@@ -455,13 +542,15 @@ prepare_ibf_iterator (void *cls,
 {
   struct Operation *op = cls;
   struct KeyEntry *ke = value;
+  struct IBF_Key salted_key;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "[OP %x] inserting %lx (hash %s) into ibf\n",
        (void *) op,
        (unsigned long) ke->ibf_key.key_val,
        GNUNET_h2s (&ke->element->element_hash));
-  ibf_insert (op->state->local_ibf, ke->ibf_key);
+  salt_key (&ke->ibf_key, op->state->salt_send, &salted_key);
+  ibf_insert (op->state->local_ibf, salted_key);
   return GNUNET_YES;
 }
 
@@ -491,11 +580,29 @@ init_key_to_element_iterator (void *cls,
 
   GNUNET_assert (GNUNET_NO == ee->remote);
 
-  op_register_element (op, ee);
+  op_register_element (op, ee, GNUNET_NO);
   return GNUNET_YES;
 }
 
 
+/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+  unsigned int len;
+
+  GNUNET_assert (NULL == op->state->key_to_element);
+  len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
+  op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
+}
+
+
 /**
  * Create an ibf with the operation's elements
  * of the specified size
@@ -508,15 +615,8 @@ static int
 prepare_ibf (struct Operation *op,
              uint32_t size)
 {
-  if (NULL == op->state->key_to_element)
-  {
-    unsigned int len;
+  GNUNET_assert (NULL != op->state->key_to_element);
 
-    len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
-    op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
-    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
-                                           init_key_to_element_iterator, op);
-  }
   if (NULL != op->state->local_ibf)
     ibf_destroy (op->state->local_ibf);
   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
@@ -582,9 +682,11 @@ send_ibf (struct Operation *op,
     ev = GNUNET_MQ_msg_extra (msg,
                               buckets_in_message * IBF_BUCKET_SIZE,
                               GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
-    msg->reserved = 0;
+    msg->reserved1 = 0;
+    msg->reserved2 = 0;
     msg->order = ibf_order;
-    msg->offset = htons (buckets_sent);
+    msg->offset = htonl (buckets_sent);
+    msg->salt = htonl (op->state->salt_send);
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
@@ -613,7 +715,7 @@ send_strata_estimator (struct Operation *op)
 {
   const struct StrataEstimator *se = op->state->se;
   struct GNUNET_MQ_Envelope *ev;
-  struct GNUNET_MessageHeader *strata_msg;
+  struct StrataEstimatorMessage *strata_msg;
   char *buf;
   size_t len;
   uint16_t type;
@@ -625,13 +727,14 @@ send_strata_estimator (struct Operation *op)
     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
   else
     type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
-  ev = GNUNET_MQ_msg_header_extra (strata_msg,
-                                   len,
-                                   type);
-  memcpy (&strata_msg[1],
+  ev = GNUNET_MQ_msg_extra (strata_msg,
+                            len,
+                            type);
+  GNUNET_memcpy (&strata_msg[1],
           buf,
           len);
   GNUNET_free (buf);
+  strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
   GNUNET_MQ_send (op->mq,
                   ev);
   op->state->phase = PHASE_EXPECT_IBF;
@@ -658,7 +761,51 @@ get_order_from_difference (unsigned int diff)
     ibf_order++;
   if (ibf_order > MAX_IBF_ORDER)
     ibf_order = MAX_IBF_ORDER;
-  return ibf_order;
+  // add one for correction
+  return ibf_order + 1;
+}
+
+
+/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_element_iterator (void *cls,
+                       const struct GNUNET_HashCode *key,
+                       void *value)
+{
+  struct Operation *op = cls;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct ElementEntry *ee = value;
+  struct GNUNET_SET_Element *el = &ee->element;
+  struct GNUNET_MQ_Envelope *ev;
+
+
+  ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  emsg->element_type = htons (el->element_type);
+  GNUNET_memcpy (&emsg[1], el->data, el->size);
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
+}
+
+
+static void
+send_full_set (struct Operation *op)
+{
+  struct GNUNET_MQ_Envelope *ev;
+
+  op->state->phase = PHASE_FULL_SENDING;
+
+  (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
+                                                &send_element_iterator, op);
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+  GNUNET_MQ_send (op->mq, ev);
 }
 
 
@@ -678,16 +825,23 @@ handle_p2p_strata_estimator (void *cls,
 {
   struct Operation *op = cls;
   struct StrataEstimator *remote_se;
-  int diff;
+  struct StrataEstimatorMessage *msg = (void *) mh;
+  unsigned int diff;
+  uint64_t other_size;
   size_t len;
 
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# bytes of SE received",
+                            ntohs (mh->size),
+                            GNUNET_NO);
+
   if (op->state->phase != PHASE_EXPECT_SE)
   {
-    fail_union_operation (op);
     GNUNET_break (0);
+    fail_union_operation (op);
     return GNUNET_SYSERR;
   }
-  len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
+  len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
   if ( (GNUNET_NO == is_compressed) &&
        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
   {
@@ -695,6 +849,7 @@ handle_p2p_strata_estimator (void *cls,
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
+  other_size = GNUNET_ntohll (msg->set_size);
   remote_se = strata_estimator_create (SE_STRATA_COUNT,
                                        SE_IBF_SIZE,
                                        SE_IBF_HASH_NUM);
@@ -705,18 +860,23 @@ handle_p2p_strata_estimator (void *cls,
     return GNUNET_SYSERR;
   }
   if (GNUNET_OK !=
-      strata_estimator_read (&mh[1],
+      strata_estimator_read (&msg[1],
                              len,
                              is_compressed,
                              remote_se))
   {
     /* decompression failed */
     fail_union_operation (op);
+    strata_estimator_destroy (remote_se);
     return GNUNET_SYSERR;
   }
   GNUNET_assert (NULL != op->state->se);
   diff = strata_estimator_difference (remote_se,
                                       op->state->se);
+
+  if (diff > 200)
+    diff = diff * 3 / 2; 
+
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (op->state->se);
   op->state->se = NULL;
@@ -724,16 +884,55 @@ handle_p2p_strata_estimator (void *cls,
        "got se diff=%d, using ibf size %d\n",
        diff,
        1<<get_order_from_difference (diff));
-  if (GNUNET_OK !=
-      send_ibf (op,
-                get_order_from_difference (diff)))
+
+  if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
   {
-    /* Internal error, best we can do is shut the connection */
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to send IBF, closing connection\n");
+    GNUNET_break (0);
     fail_union_operation (op);
     return GNUNET_SYSERR;
   }
+
+
+  if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
+  {
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Sending full set (diff=%d, own set=%u)\n",
+         diff,
+         op->state->initial_size);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of full sends",
+                              1,
+                              GNUNET_NO);
+    if (op->state->initial_size <= other_size)
+    {
+      send_full_set (op);
+    }
+    else
+    {
+      struct GNUNET_MQ_Envelope *ev;
+      op->state->phase = PHASE_EXPECT_IBF;
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
+      GNUNET_MQ_send (op->mq, ev);
+    }
+  }
+  else
+  {
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# of ibf sends",
+                              1,
+                              GNUNET_NO);
+    if (GNUNET_OK !=
+        send_ibf (op,
+                  get_order_from_difference (diff)))
+    {
+      /* Internal error, best we can do is shut the connection */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to send IBF, closing connection\n");
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+  }
+
   return GNUNET_OK;
 }
 
@@ -817,6 +1016,7 @@ decode_and_send (struct Operation *op)
   if (GNUNET_OK !=
       prepare_ibf (op, op->state->remote_ibf->size))
   {
+    GNUNET_break (0);
     /* allocation failed */
     return GNUNET_SYSERR;
   }
@@ -831,7 +1031,7 @@ decode_and_send (struct Operation *op)
        diff_ibf->size);
 
   num_decoded = 0;
-  last_key.key_val = 0;
+  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
 
   while (1)
   {
@@ -848,7 +1048,8 @@ decode_and_send (struct Operation *op)
            (unsigned long) key.key_val);
       num_decoded += 1;
       if ( (num_decoded > diff_ibf->size) ||
-           (num_decoded > 1 && last_key.key_val == key.key_val) )
+           ( (num_decoded > 1) &&
+             (last_key.key_val == key.key_val) ) )
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
              "detected cyclic ibf (decoded %u/%u)\n",
@@ -874,6 +1075,7 @@ decode_and_send (struct Operation *op)
                                   "# of IBF retries",
                                   1,
                                   GNUNET_NO);
+        op->state->salt_send++;
         if (GNUNET_OK !=
             send_ibf (op, next_order))
         {
@@ -915,20 +1117,22 @@ decode_and_send (struct Operation *op)
     }
     if (1 == side)
     {
-      send_offers_for_key (op, key);
+      struct IBF_Key unsalted_key;
+      unsalt_key (&key, op->state->salt_receive, &unsalted_key);
+      send_offers_for_key (op, unsalted_key);
     }
     else if (-1 == side)
     {
       struct GNUNET_MQ_Envelope *ev;
-      struct GNUNET_MessageHeader *msg;
+      struct InquiryMessage *msg;
 
       /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
        * the effort additional complexity. */
-      ev = GNUNET_MQ_msg_header_extra (msg,
-                                       sizeof (struct IBF_Key),
-                                       GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
-
-      memcpy (&msg[1],
+      ev = GNUNET_MQ_msg_extra (msg,
+                                sizeof (struct IBF_Key),
+                                GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
+      msg->salt = htonl (op->state->salt_receive);
+      GNUNET_memcpy (&msg[1],
               &key,
               sizeof (struct IBF_Key));
       LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -981,6 +1185,8 @@ handle_p2p_ibf (void *cls,
          "Creating new ibf of size %u\n",
          1 << msg->order);
     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    op->state->salt_receive = ntohl (msg->salt);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
     if (NULL == op->state->remote_ibf)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -989,7 +1195,7 @@ handle_p2p_ibf (void *cls,
       return GNUNET_SYSERR;
     }
     op->state->ibf_buckets_received = 0;
-    if (0 != ntohs (msg->offset))
+    if (0 != ntohl (msg->offset))
     {
       GNUNET_break_op (0);
       fail_union_operation (op);
@@ -998,8 +1204,19 @@ handle_p2p_ibf (void *cls,
   }
   else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
   {
-    if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
-         (1<<msg->order != op->state->remote_ibf->size) )
+    if (ntohl (msg->offset) != op->state->ibf_buckets_received)
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+    if (1<<msg->order != op->state->remote_ibf->size)
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+    if (ntohl (msg->salt) != op->state->salt_receive)
     {
       GNUNET_break_op (0);
       fail_union_operation (op);
@@ -1082,8 +1299,9 @@ send_client_element (struct Operation *op,
   }
   rm->result_status = htons (status);
   rm->request_id = htonl (op->spec->client_request_id);
-  rm->element_type = element->element_type;
-  memcpy (&rm[1], element->data, element->size);
+  rm->element_type = htons (element->element_type);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
+  GNUNET_memcpy (&rm[1], element->data, element->size);
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
 }
 
@@ -1105,6 +1323,7 @@ send_done_and_destroy (void *cls)
   rm->request_id = htonl (op->spec->client_request_id);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   rm->element_type = htons (0);
+  rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
   /* Will also call the union-specific cancel function. */
   _GSS_operation_destroy (op, GNUNET_YES);
@@ -1151,6 +1370,8 @@ maybe_finish (struct Operation *op)
 
 /**
  * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
  *
  * @param cls the union operation
  * @param mh the message
@@ -1181,7 +1402,7 @@ handle_p2p_elements (void *cls,
 
   element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
-  memcpy (&ee[1], &emsg[1], element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
   ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->element.element_type = ntohs (emsg->element_type);
@@ -1209,8 +1430,16 @@ handle_p2p_elements (void *cls,
                             "# received elements",
                             1,
                             GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total += 1;
 
-  if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
      * we track demands. */
@@ -1218,13 +1447,15 @@ handle_p2p_elements (void *cls,
                               "# repeated elements",
                               1,
                               GNUNET_NO);
+    ke->received = GNUNET_YES;
     GNUNET_free (ee);
   }
   else
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
-    op_register_element (op, ee);
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
     {
@@ -1241,10 +1472,117 @@ handle_p2p_elements (void *cls,
     }
   }
 
+  if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
   maybe_finish (op);
 }
 
 
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_full_element (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  const struct GNUNET_SET_ElementMessage *emsg;
+  uint16_t element_size;
+
+  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  ee->element.size = element_size;
+  ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
+  ee->remote = GNUNET_YES;
+  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (full diff, size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total += 1;
+
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+
+  if ( (GNUNET_YES == op->spec->byzantine) && 
+       (op->state->received_total > 384 + op->state->received_fresh * 4) && 
+       (op->state->received_fresh < op->state->received_total / 6) )
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Other peer sent only %llu/%llu fresh elements, failing operation\n",
+         (unsigned long long) op->state->received_fresh,
+         (unsigned long long) op->state->received_total);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
 /**
  * Send offers (for GNUNET_Hash-es) in response
  * to inquiries (for IBF_Key-s).
@@ -1259,6 +1597,7 @@ handle_p2p_inquiry (void *cls,
   struct Operation *op = cls;
   const struct IBF_Key *ibf_key;
   unsigned int num_keys;
+  struct InquiryMessage *msg;
 
   /* look up elements and send them */
   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
@@ -1267,9 +1606,9 @@ handle_p2p_inquiry (void *cls,
     fail_union_operation (op);
     return;
   }
-  num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+  num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
       / sizeof (struct IBF_Key);
-  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+  if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
       != num_keys * sizeof (struct IBF_Key))
   {
     GNUNET_break_op (0);
@@ -1277,17 +1616,130 @@ handle_p2p_inquiry (void *cls,
     return;
   }
 
-  ibf_key = (const struct IBF_Key *) &mh[1];
+  msg = (struct InquiryMessage *) mh;
+
+  ibf_key = (const struct IBF_Key *) &msg[1];
   while (0 != num_keys--)
   {
-    send_offers_for_key (op, *ibf_key);
+    struct IBF_Key unsalted_key;
+    unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
+    send_offers_for_key (op, unsalted_key);
     ibf_key++;
   }
 }
 
 
 /**
- * FIXME
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+                            uint32_t key,
+                            void *value)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct ElementEntry *ee = ke->element;
+
+  if (GNUNET_YES == ke->received)
+    return GNUNET_YES;
+
+  ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+  emsg->reserved = htons (0);
+  emsg->element_type = htons (ee->element.element_type);
+  GNUNET_MQ_send (op->mq, ev);
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Handle a 
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_request_full (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  if (PHASE_EXPECT_IBF != op->state->phase)
+  {
+    fail_union_operation (op);
+    GNUNET_break_op (0);
+    return;
+  }
+
+  // FIXME: we need to check that our set is larger than the
+  // byzantine_lower_bound by some threshold
+  send_full_set (op);
+}
+
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_full_done (void *cls,
+                      const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  if (PHASE_EXPECT_IBF == op->state->phase)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+
+    /* send all the elements that did not come from the remote peer */
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                             &send_missing_elements_iter,
+                                             op);
+
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+    GNUNET_MQ_send (op->mq, ev);
+    op->state->phase = PHASE_DONE;
+
+    /* we now wait until the other peer shuts the tunnel down*/
+  }
+  else if (PHASE_FULL_SENDING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+    /* We sent the full set, and got the response for that.  We're done. */
+    op->state->phase = PHASE_DONE;
+    send_done_and_destroy (op);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
+
+/**
+ * Handle a demand by the other peer for elements based on a list
+ * of GNUNET_HashCode-s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
  */
 static void
 handle_p2p_demand (void *cls,
@@ -1330,7 +1782,7 @@ handle_p2p_demand (void *cls,
       return;
     }
     ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
-    memcpy (&emsg[1], ee->element.data, ee->element.size);
+    GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
     emsg->reserved = htons (0);
     emsg->element_type = htons (ee->element.element_type);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1339,6 +1791,10 @@ handle_p2p_demand (void *cls,
          (unsigned int) ee->element.size,
          GNUNET_h2s (&ee->element_hash));
     GNUNET_MQ_send (op->mq, ev);
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# exchanged elements",
+                              1,
+                              GNUNET_NO);
 
     switch (op->spec->result_mode)
     {
@@ -1504,8 +1960,13 @@ union_evaluate (struct Operation *op,
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
   /* we started the operation, thus we have to send the operation request */
   op->state->phase = PHASE_EXPECT_SE;
+  op->state->salt_receive = op->state->salt_send = 42;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Initiating union operation evaluation\n");
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of total union operations",
+                            1,
+                            GNUNET_NO);
   GNUNET_STATISTICS_update (_GSS_statistics,
                             "# of initiated union operations",
                             1,
@@ -1517,11 +1978,10 @@ union_evaluate (struct Operation *op,
   {
     /* the context message is too large */
     GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (op->spec->set->client);
+    GNUNET_SERVICE_client_drop (op->spec->set->client);
     return;
   }
   msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
-  msg->app_id = op->spec->app_id;
   GNUNET_MQ_send (op->mq,
                   ev);
 
@@ -1531,6 +1991,9 @@ union_evaluate (struct Operation *op,
   else
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "sent op request without context message\n");
+
+  initialize_key_to_element (op);
+  op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
 }
 
 
@@ -1551,10 +2014,17 @@ union_accept (struct Operation *op)
                             "# of accepted union operations",
                             1,
                             GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# of total union operations",
+                            1,
+                            GNUNET_NO);
 
   op->state = GNUNET_new (struct OperationState);
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
+  op->state->salt_receive = op->state->salt_send = 42;
+  initialize_key_to_element (op);
+  op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
   /* kick off the operation */
   send_strata_estimator (op);
 }
@@ -1599,7 +2069,7 @@ static void
 union_add (struct SetState *set_state, struct ElementEntry *ee)
 {
   strata_estimator_insert (set_state->se,
-                           get_ibf_key (&ee->element_hash, 0));
+                           get_ibf_key (&ee->element_hash));
 }
 
 
@@ -1614,7 +2084,7 @@ static void
 union_remove (struct SetState *set_state, struct ElementEntry *ee)
 {
   strata_estimator_remove (set_state->se,
-                           get_ibf_key (&ee->element_hash, 0));
+                           get_ibf_key (&ee->element_hash));
 }
 
 
@@ -1662,6 +2132,9 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
       handle_p2p_elements (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
+      handle_p2p_full_element (op, mh);
+      break;
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
       handle_p2p_inquiry (op, mh);
       break;
@@ -1674,6 +2147,12 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
       handle_p2p_demand (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
+      handle_p2p_full_done (op, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
+      handle_p2p_request_full (op, mh);
+      break;
     default:
       /* Something wrong with cadet's message handlers? */
       GNUNET_assert (0);