* In the penultimate phase,
* we wait until all our demands
* are satisfied. Then we send a done
- * message, and wait for another done message.*/
+ * message, and wait for another done message.
+ */
PHASE_FINISH_WAITING,
/**
* In the ultimate phase, we wait until
* our demands are satisfied and then
- * quit (sending another DONE message). */
- PHASE_DONE
+ * quit (sending another DONE message).
+ */
+ PHASE_DONE,
+
+ /**
+ * After sending the full set, wait for responses with the elements
+ * that the local peer is missing.
+ */
+ PHASE_FULL_SENDING,
};
* is #GNUNET_YES.
*/
struct ElementEntry *element;
+
+ /**
+ * Did we receive this element?
+ * Even if element->is_foreign is false, we might
+ * have received the element, so this indicates that
+ * the other peer has it.
+ */
+ int received;
};
}
+/**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+ struct GNUNET_HashCode hash;
+ struct KeyEntry *k;
+};
+
+
/**
* Iterator over the mapping from IBF keys to element entries. Checks if we
* have an element with a given GNUNET_HashCode.
* #GNUNET_NO if we've found the element.
*/
static int
-op_has_element_iterator (void *cls,
+op_get_element_iterator (void *cls,
uint32_t key,
void *value)
{
- struct GNUNET_HashCode *element_hash = cls;
+ struct GetElementContext *ctx = cls;
struct KeyEntry *k = value;
GNUNET_assert (NULL != k);
if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
- element_hash))
+ &ctx->hash))
+ {
+ ctx->k = k;
return GNUNET_NO;
+ }
return GNUNET_YES;
}
* @param element_hash hash of the element to look for
* @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
*/
-static int
-op_has_element (struct Operation *op,
+static struct KeyEntry *
+op_get_element (struct Operation *op,
const struct GNUNET_HashCode *element_hash)
{
int ret;
struct IBF_Key ibf_key;
+ struct GetElementContext ctx = { 0 };
+
+ ctx.hash = *element_hash;
ibf_key = get_ibf_key (element_hash);
ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
(uint32_t) ibf_key.key_val,
- op_has_element_iterator,
- (void *) element_hash);
+ op_get_element_iterator,
+ &ctx);
/* was the iteration aborted because we found the element? */
if (GNUNET_SYSERR == ret)
- return GNUNET_YES;
- return GNUNET_NO;
+ {
+ GNUNET_assert (NULL != ctx.k);
+ return ctx.k;
+ }
+ return NULL;
}
*
* @param op the union operation
* @param ee the element entry
+ * @parem received was this element received from the remote peer?
*/
static void
op_register_element (struct Operation *op,
- struct ElementEntry *ee)
+ struct ElementEntry *ee,
+ int received)
{
struct IBF_Key ibf_key;
struct KeyEntry *k;
k = GNUNET_new (struct KeyEntry);
k->element = ee;
k->ibf_key = ibf_key;
+ k->received = received;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
(uint32_t) ibf_key.key_val,
GNUNET_assert (GNUNET_NO == ee->remote);
- op_register_element (op, ee);
+ op_register_element (op, ee, GNUNET_NO);
return GNUNET_YES;
}
+/**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+ unsigned int len;
+
+ GNUNET_assert (NULL == op->state->key_to_element);
+ len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
+ op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+ GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
+}
+
+
/**
* Create an ibf with the operation's elements
* of the specified size
prepare_ibf (struct Operation *op,
uint32_t size)
{
- if (NULL == op->state->key_to_element)
- {
- unsigned int len;
+ GNUNET_assert (NULL != op->state->key_to_element);
- len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
- op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
- GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
- init_key_to_element_iterator, op);
- }
if (NULL != op->state->local_ibf)
ibf_destroy (op->state->local_ibf);
op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
}
+/**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ * into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_element_iterator (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct GNUNET_SET_ElementMessage *emsg;
+ struct GNUNET_SET_Element *el = value;
+ struct GNUNET_MQ_Envelope *ev;
+
+ ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ emsg->element_type = htonl (el->element_type);
+ GNUNET_memcpy (&emsg[1], el->data, el->size);
+ GNUNET_MQ_send (op->mq, ev);
+ return GNUNET_YES;
+}
+
+
+static void
+send_full_set (struct Operation *op)
+{
+ struct GNUNET_MQ_Envelope *ev;
+
+ op->state->phase = PHASE_FULL_SENDING;
+
+ (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
+ &send_element_iterator, op);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+}
+
+
/**
* Handle a strata estimator from a remote peer
*
"got se diff=%d, using ibf size %d\n",
diff,
1<<get_order_from_difference (diff));
- if (GNUNET_OK !=
- send_ibf (op,
- get_order_from_difference (diff)))
+
+ if (diff > GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2)
{
- /* Internal error, best we can do is shut the connection */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to send IBF, closing connection\n");
- fail_union_operation (op);
- return GNUNET_SYSERR;
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Sending full set (diff=%d, own set=%u)\n",
+ diff,
+ GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
+ send_full_set (op);
+ }
+ else
+ {
+ if (GNUNET_OK !=
+ send_ibf (op,
+ get_order_from_difference (diff)))
+ {
+ /* Internal error, best we can do is shut the connection */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to send IBF, closing connection\n");
+ fail_union_operation (op);
+ return GNUNET_SYSERR;
+ }
}
+
return GNUNET_OK;
}
op->state->received_total += 1;
- if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+ struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+ if (NULL != ke)
{
/* Got repeated element. Should not happen since
* we track demands. */
"# repeated elements",
1,
GNUNET_NO);
+ ke->received = GNUNET_YES;
GNUNET_free (ee);
}
else
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Registering new element from remote peer\n");
op->state->received_fresh += 1;
- op_register_element (op, ee);
+ op_register_element (op, ee, GNUNET_YES);
/* only send results immediately if the client wants it */
switch (op->spec->result_mode)
{
}
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_full_element (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ const struct GNUNET_SET_ElementMessage *emsg;
+ uint16_t element_size;
+
+ if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+ element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+ ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+ GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+ ee->element.size = element_size;
+ ee->element.data = &ee[1];
+ ee->element.element_type = ntohs (emsg->element_type);
+ ee->remote = GNUNET_YES;
+ GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got element (full diff, size %u, hash %s) from peer\n",
+ (unsigned int) element_size,
+ GNUNET_h2s (&ee->element_hash));
+
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# received elements",
+ 1,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# exchanged elements",
+ 1,
+ GNUNET_NO);
+
+ op->state->received_total += 1;
+
+ struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+ if (NULL != ke)
+ {
+ /* Got repeated element. Should not happen since
+ * we track demands. */
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# repeated elements",
+ 1,
+ GNUNET_NO);
+ ke->received = GNUNET_YES;
+ GNUNET_free (ee);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Registering new element from remote peer\n");
+ op->state->received_fresh += 1;
+ op_register_element (op, ee, GNUNET_YES);
+ /* only send results immediately if the client wants it */
+ switch (op->spec->result_mode)
+ {
+ case GNUNET_SET_RESULT_ADDED:
+ send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+ break;
+ case GNUNET_SET_RESULT_SYMMETRIC:
+ send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+ break;
+ default:
+ /* Result mode not supported, should have been caught earlier. */
+ GNUNET_break (0);
+ break;
+ }
+ }
+
+ if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+ {
+ /* The other peer gave us lots of old elements, there's something wrong. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
/**
* Send offers (for GNUNET_Hash-es) in response
* to inquiries (for IBF_Key-s).
}
+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct KeyEntry *ke = value;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_ElementMessage *emsg;
+ struct ElementEntry *ee = ke->element;
+
+ if (GNUNET_YES == ke->received)
+ return GNUNET_YES;
+
+ ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+ emsg->reserved = htons (0);
+ emsg->element_type = htons (ee->element.element_type);
+ GNUNET_MQ_send (op->mq, ev);
+
+ return GNUNET_YES;
+}
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_full_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ if (PHASE_EXPECT_IBF == op->state->phase)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+
+ /* send all the elements that did not come from the remote peer */
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &send_missing_elements_iter,
+ op);
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+ op->state->phase = PHASE_DONE;
+
+ /* we now wait until the other peer shuts the tunnel down*/
+ }
+ else if (PHASE_FULL_SENDING == op->state->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+ /* We sent the full set, and got the response for that. We're done. */
+ op->state->phase = PHASE_DONE;
+ send_done_and_destroy (op);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
+
/**
* Handle a demand by the other peer for elements based on a list
* of GNUNET_HashCode-s.
else
LOG (GNUNET_ERROR_TYPE_DEBUG,
"sent op request without context message\n");
+
+ initialize_key_to_element (op);
}
op->state->se = strata_estimator_dup (op->spec->set->state->se);
op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
op->state->salt_receive = op->state->salt_send = 42;
+ initialize_key_to_element (op);
/* kick off the operation */
send_strata_estimator (op);
}
case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
handle_p2p_elements (op, mh);
break;
+ case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
+ handle_p2p_full_element (op, mh);
+ break;
case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
handle_p2p_inquiry (op, mh);
break;
case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
handle_p2p_demand (op, mh);
break;
+ case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
+ handle_p2p_full_done (op, mh);
+ break;
default:
/* Something wrong with cadet's message handlers? */
GNUNET_assert (0);