implement union via sending whole set
authorFlorian Dold <florian.dold@gmail.com>
Thu, 23 Feb 2017 16:13:39 +0000 (17:13 +0100)
committerFlorian Dold <florian.dold@gmail.com>
Thu, 23 Feb 2017 16:47:40 +0000 (17:47 +0100)
src/include/gnunet_protocols.h
src/set/gnunet-service-set.c
src/set/gnunet-service-set_union.c
src/set/test_set.conf

index a10c0ca5d1a8d99eed73caefda55afba724810f8..f478edd27f725c4109707e0b8a5403f7a0f304be 100644 (file)
@@ -1800,7 +1800,13 @@ extern "C"
  * based on their sets and the elements we previously sent
  * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
  */
-#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_GET_MISSING 597
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE 597
+
+/**
+ * Send a set element, not as response to a demand but because
+ * we're sending the full set.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT 598
 
 
 /*******************************************************************************
index a545e8a066b17d9b61a99d1889241fd957159a26..1072407f19714a2bcd95913dc612ab3a63837a5b 100644 (file)
@@ -1370,6 +1370,10 @@ handle_client_listen (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
                            struct GNUNET_MessageHeader,
                            NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
                            struct GNUNET_MessageHeader,
@@ -1378,6 +1382,10 @@ handle_client_listen (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
                            struct GNUNET_MessageHeader,
                            NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
@@ -1633,6 +1641,14 @@ handle_client_evaluate (void *cls,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
                            struct GNUNET_MessageHeader,
                            op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           op),
     GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
index 137216ed7bcc543d4a365d9d3f5f3008e9b37f15..d2dfe049b16e960e98e84bb3518c6ec315edde25 100644 (file)
@@ -115,14 +115,22 @@ enum UnionOperationPhase
    * In the penultimate phase,
    * we wait until all our demands
    * are satisfied.  Then we send a done
-   * message, and wait for another done message.*/
+   * message, and wait for another done message.
+   */
   PHASE_FINISH_WAITING,
 
   /**
    * In the ultimate phase, we wait until
    * our demands are satisfied and then
-   * quit (sending another DONE message). */
-  PHASE_DONE
+   * quit (sending another DONE message).
+   */
+  PHASE_DONE,
+
+  /**
+   * After sending the full set, wait for responses with the elements
+   * that the local peer is missing.
+   */
+  PHASE_FULL_SENDING,
 };
 
 
@@ -214,6 +222,14 @@ struct KeyEntry
    * is #GNUNET_YES.
    */
   struct ElementEntry *element;
+
+  /**
+   * Did we receive this element?
+   * Even if element->is_foreign is false, we might
+   * have received the element, so this indicates that
+   * the other peer has it.
+   */
+  int received;
 };
 
 
@@ -372,6 +388,16 @@ get_ibf_key (const struct GNUNET_HashCode *src)
 }
 
 
+/**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+  struct GNUNET_HashCode hash;
+  struct KeyEntry *k;
+};
+
+
 /**
  * Iterator over the mapping from IBF keys to element entries.  Checks if we
  * have an element with a given GNUNET_HashCode.
@@ -383,17 +409,20 @@ get_ibf_key (const struct GNUNET_HashCode *src)
  *         #GNUNET_NO if we've found the element.
  */
 static int
-op_has_element_iterator (void *cls,
+op_get_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
 {
-  struct GNUNET_HashCode *element_hash = cls;
+  struct GetElementContext *ctx = cls;
   struct KeyEntry *k = value;
 
   GNUNET_assert (NULL != k);
   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
-                                   element_hash))
+                                   &ctx->hash))
+  {
+    ctx->k = k;
     return GNUNET_NO;
+  }
   return GNUNET_YES;
 }
 
@@ -406,23 +435,29 @@ op_has_element_iterator (void *cls,
  * @param element_hash hash of the element to look for
  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
  */
-static int
-op_has_element (struct Operation *op,
+static struct KeyEntry *
+op_get_element (struct Operation *op,
                 const struct GNUNET_HashCode *element_hash)
 {
   int ret;
   struct IBF_Key ibf_key;
+  struct GetElementContext ctx = { 0 };
+
+  ctx.hash = *element_hash;
 
   ibf_key = get_ibf_key (element_hash);
   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
                                                       (uint32_t) ibf_key.key_val,
-                                                      op_has_element_iterator,
-                                                      (void *) element_hash);
+                                                      op_get_element_iterator,
+                                                      &ctx);
 
   /* was the iteration aborted because we found the element? */
   if (GNUNET_SYSERR == ret)
-    return GNUNET_YES;
-  return GNUNET_NO;
+  {
+    GNUNET_assert (NULL != ctx.k);
+    return ctx.k;
+  }
+  return NULL;
 }
 
 
@@ -438,10 +473,12 @@ op_has_element (struct Operation *op,
  *
  * @param op the union operation
  * @param ee the element entry
+ * @parem received was this element received from the remote peer?
  */
 static void
 op_register_element (struct Operation *op,
-                     struct ElementEntry *ee)
+                     struct ElementEntry *ee,
+                     int received)
 {
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
@@ -450,6 +487,7 @@ op_register_element (struct Operation *op,
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
+  k->received = received;
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
                                                       (uint32_t) ibf_key.key_val,
@@ -535,11 +573,29 @@ init_key_to_element_iterator (void *cls,
 
   GNUNET_assert (GNUNET_NO == ee->remote);
 
-  op_register_element (op, ee);
+  op_register_element (op, ee, GNUNET_NO);
   return GNUNET_YES;
 }
 
 
+/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+  unsigned int len;
+
+  GNUNET_assert (NULL == op->state->key_to_element);
+  len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
+  op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
+}
+
+
 /**
  * Create an ibf with the operation's elements
  * of the specified size
@@ -552,15 +608,8 @@ static int
 prepare_ibf (struct Operation *op,
              uint32_t size)
 {
-  if (NULL == op->state->key_to_element)
-  {
-    unsigned int len;
+  GNUNET_assert (NULL != op->state->key_to_element);
 
-    len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
-    op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
-    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
-                                           init_key_to_element_iterator, op);
-  }
   if (NULL != op->state->local_ibf)
     ibf_destroy (op->state->local_ibf);
   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
@@ -708,6 +757,47 @@ get_order_from_difference (unsigned int diff)
 }
 
 
+/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_element_iterator (void *cls,
+                       const struct GNUNET_HashCode *key,
+                       void *value)
+{
+  struct Operation *op = cls;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct GNUNET_SET_Element *el = value;
+  struct GNUNET_MQ_Envelope *ev;
+
+  ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  emsg->element_type = htonl (el->element_type);
+  GNUNET_memcpy (&emsg[1], el->data, el->size);
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
+}
+
+
+static void
+send_full_set (struct Operation *op)
+{
+  struct GNUNET_MQ_Envelope *ev;
+
+  op->state->phase = PHASE_FULL_SENDING;
+
+  (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
+                                                &send_element_iterator, op);
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+  GNUNET_MQ_send (op->mq, ev);
+}
+
+
 /**
  * Handle a strata estimator from a remote peer
  *
@@ -776,16 +866,29 @@ handle_p2p_strata_estimator (void *cls,
        "got se diff=%d, using ibf size %d\n",
        diff,
        1<<get_order_from_difference (diff));
-  if (GNUNET_OK !=
-      send_ibf (op,
-                get_order_from_difference (diff)))
+
+  if (diff > GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2)
   {
-    /* Internal error, best we can do is shut the connection */
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to send IBF, closing connection\n");
-    fail_union_operation (op);
-    return GNUNET_SYSERR;
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Sending full set (diff=%d, own set=%u)\n",
+         diff,
+         GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
+    send_full_set (op);
+  }
+  else
+  {
+    if (GNUNET_OK !=
+        send_ibf (op,
+                  get_order_from_difference (diff)))
+    {
+      /* Internal error, best we can do is shut the connection */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to send IBF, closing connection\n");
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
   }
+
   return GNUNET_OK;
 }
 
@@ -1288,7 +1391,9 @@ handle_p2p_elements (void *cls,
 
   op->state->received_total += 1;
 
-  if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
      * we track demands. */
@@ -1296,6 +1401,7 @@ handle_p2p_elements (void *cls,
                               "# repeated elements",
                               1,
                               GNUNET_NO);
+    ke->received = GNUNET_YES;
     GNUNET_free (ee);
   }
   else
@@ -1303,7 +1409,7 @@ handle_p2p_elements (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
     op->state->received_fresh += 1;
-    op_register_element (op, ee);
+    op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
     {
@@ -1332,6 +1438,99 @@ handle_p2p_elements (void *cls,
 }
 
 
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_full_element (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  const struct GNUNET_SET_ElementMessage *emsg;
+  uint16_t element_size;
+
+  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  ee->element.size = element_size;
+  ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
+  ee->remote = GNUNET_YES;
+  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (full diff, size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total += 1;
+
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+
+  if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
 /**
  * Send offers (for GNUNET_Hash-es) in response
  * to inquiries (for IBF_Key-s).
@@ -1378,6 +1577,85 @@ handle_p2p_inquiry (void *cls,
 }
 
 
+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+                            uint32_t key,
+                            void *value)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct ElementEntry *ee = ke->element;
+
+  if (GNUNET_YES == ke->received)
+    return GNUNET_YES;
+
+  ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+  emsg->reserved = htons (0);
+  emsg->element_type = htons (ee->element.element_type);
+  GNUNET_MQ_send (op->mq, ev);
+
+  return GNUNET_YES;
+}
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_full_done (void *cls,
+                      const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  if (PHASE_EXPECT_IBF == op->state->phase)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+
+    /* send all the elements that did not come from the remote peer */
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                             &send_missing_elements_iter,
+                                             op);
+
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+    GNUNET_MQ_send (op->mq, ev);
+    op->state->phase = PHASE_DONE;
+
+    /* we now wait until the other peer shuts the tunnel down*/
+  }
+  else if (PHASE_FULL_SENDING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+    /* We sent the full set, and got the response for that.  We're done. */
+    op->state->phase = PHASE_DONE;
+    send_done_and_destroy (op);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
+
 /**
  * Handle a demand by the other peer for elements based on a list
  * of GNUNET_HashCode-s.
@@ -1635,6 +1913,8 @@ union_evaluate (struct Operation *op,
   else
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "sent op request without context message\n");
+
+  initialize_key_to_element (op);
 }
 
 
@@ -1664,6 +1944,7 @@ union_accept (struct Operation *op)
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
   op->state->salt_receive = op->state->salt_send = 42;
+  initialize_key_to_element (op);
   /* kick off the operation */
   send_strata_estimator (op);
 }
@@ -1771,6 +2052,9 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
       handle_p2p_elements (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
+      handle_p2p_full_element (op, mh);
+      break;
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
       handle_p2p_inquiry (op, mh);
       break;
@@ -1783,6 +2067,9 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
       handle_p2p_demand (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
+      handle_p2p_full_done (op, mh);
+      break;
     default:
       /* Something wrong with cadet's message handlers? */
       GNUNET_assert (0);
index 69e7f5c52a8d80516a12f8d74a0c82967bf97d2b..30ccbde5558dd9275aeaf252e5c14dec92f80025 100644 (file)
@@ -5,7 +5,7 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-set/
 
 [set]
 AUTOSTART = YES
-PREFIX = valgrind
+PREFIX = valgrind
 #PREFIX = valgrind --leak-check=full
 #PREFIX = gdbserver :1234
 OPTIONS = -L INFO