+/**
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ * #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+ uint32_t key,
+ void *value)
+{
+ struct Operation *op = cls;
+ struct KeyEntry *ke = value;
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_SET_ElementMessage *emsg;
+ struct ElementEntry *ee = ke->element;
+
+ if (GNUNET_YES == ke->received)
+ return GNUNET_YES;
+
+ ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+ GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+ emsg->reserved = htons (0);
+ emsg->element_type = htons (ee->element.element_type);
+ GNUNET_MQ_send (op->mq, ev);
+
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handle a
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_request_full (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ if (PHASE_EXPECT_IBF != op->state->phase)
+ {
+ fail_union_operation (op);
+ GNUNET_break_op (0);
+ return;
+ }
+
+ // FIXME: we need to check that our set is larger than the
+ // byzantine_lower_bound by some threshold
+ send_full_set (op);
+}
+
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_full_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+
+ if (PHASE_EXPECT_IBF == op->state->phase)
+ {
+ struct GNUNET_MQ_Envelope *ev;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+
+ /* send all the elements that did not come from the remote peer */
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &send_missing_elements_iter,
+ op);
+
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+ op->state->phase = PHASE_DONE;
+
+ /* we now wait until the other peer shuts the tunnel down*/
+ }
+ else if (PHASE_FULL_SENDING == op->state->phase)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+ /* We sent the full set, and got the response for that. We're done. */
+ op->state->phase = PHASE_DONE;
+ send_done_and_destroy (op);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+}
+
+
+/**
+ * Handle a demand by the other peer for elements based on a list
+ * of GNUNET_HashCode-s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct GNUNET_SET_ElementMessage *emsg;
+ const struct GNUNET_HashCode *hash;
+ unsigned int num_hashes;
+ struct GNUNET_MQ_Envelope *ev;
+
+ num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ / sizeof (struct GNUNET_HashCode);
+ if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ != num_hashes * sizeof (struct GNUNET_HashCode))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ for (hash = (const struct GNUNET_HashCode *) &mh[1];
+ num_hashes > 0;
+ hash++, num_hashes--)
+ {
+ ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
+ if (NULL == ee)
+ {
+ /* Demand for non-existing element. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+ {
+ /* Probably confused lazily copied sets. */
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+ GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+ emsg->reserved = htons (0);
+ emsg->element_type = htons (ee->element.element_type);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
+ (void *) op,
+ (unsigned int) ee->element.size,
+ GNUNET_h2s (&ee->element_hash));
+ GNUNET_MQ_send (op->mq, ev);
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# exchanged elements",
+ 1,
+ GNUNET_NO);
+
+ switch (op->spec->result_mode)
+ {
+ case GNUNET_SET_RESULT_ADDED:
+ /* Nothing to do. */
+ break;
+ case GNUNET_SET_RESULT_SYMMETRIC:
+ send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
+ break;
+ default:
+ /* Result mode not supported, should have been caught earlier. */
+ GNUNET_break (0);
+ break;
+ }
+ }
+}
+
+
+/**
+ * Handle offers (of GNUNET_HashCode-s) and
+ * respond with demands (of GNUNET_HashCode-s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ const struct GNUNET_HashCode *hash;
+ unsigned int num_hashes;
+
+ /* look up elements and send them */
+ if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+ (op->state->phase != PHASE_INVENTORY_ACTIVE))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ / sizeof (struct GNUNET_HashCode);
+ if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ != num_hashes * sizeof (struct GNUNET_HashCode))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+
+ for (hash = (const struct GNUNET_HashCode *) &mh[1];
+ num_hashes > 0;
+ hash++, num_hashes--)
+ {
+ struct ElementEntry *ee;
+ struct GNUNET_MessageHeader *demands;
+ struct GNUNET_MQ_Envelope *ev;
+
+ ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
+ hash);
+ if (NULL != ee)
+ if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
+ continue;
+
+ if (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
+ hash))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Skipped sending duplicate demand\n");
+ continue;
+ }
+
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (op->state->demanded_hashes,
+ hash,
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "[OP %x] Requesting element (hash %s)\n",
+ (void *) op, GNUNET_h2s (hash));
+ ev = GNUNET_MQ_msg_header_extra (demands,
+ sizeof (struct GNUNET_HashCode),
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
+ *(struct GNUNET_HashCode *) &demands[1] = *hash;
+ GNUNET_MQ_send (op->mq, ev);
+ }
+}
+
+