/*
This file is part of GNUnet
- (C) 2013, 2014 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2013-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
* @file set/gnunet-service-set_intersection.c
#include "gnunet-service-set.h"
#include "gnunet_block_lib.h"
#include "gnunet-service-set_protocol.h"
+#include "gnunet-service-set_intersection.h"
#include <gcrypt.h>
*/
PHASE_BF_EXCHANGE,
+ /**
+ * We must next send the P2P DONE message (after finishing mostly
+ * with the local client). Then we will wait for the channel to close.
+ */
+ PHASE_MUST_SEND_DONE,
+
+ /**
+ * We have received the P2P DONE message, and must finish with the
+ * local client before terminating the channel.
+ */
+ PHASE_DONE_RECEIVED,
+
/**
* The protocol is over. Results may still have to be sent to the
* client.
*/
PHASE_FINISHED
+
};
* Did we send the client that we are done?
*/
int client_done_sent;
+
+ /**
+ * Set whenever we reach the state where the death of the
+ * channel is perfectly find and should NOT result in the
+ * operation being cancelled.
+ */
+ int channel_death_expected;
};
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
- if (GNUNET_SET_RESULT_REMOVED != op->spec->result_mode)
+ if (GNUNET_SET_RESULT_REMOVED != op->result_mode)
return; /* Wrong mode for transmitting removed elements */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending removed element (size %u) to client\n",
element->size);
- GNUNET_assert (0 != op->spec->client_request_id);
+ GNUNET_assert (0 != op->client_request_id);
ev = GNUNET_MQ_msg_extra (rm,
element->size,
GNUNET_MESSAGE_TYPE_SET_RESULT);
return;
}
rm->result_status = htons (GNUNET_SET_STATUS_OK);
- rm->request_id = htonl (op->spec->client_request_id);
+ rm->request_id = htonl (op->client_request_id);
rm->element_type = element->element_type;
- memcpy (&rm[1],
- element->data,
- element->size);
- GNUNET_MQ_send (op->spec->set->client_mq,
+ GNUNET_memcpy (&rm[1],
+ element->data,
+ element->size);
+ GNUNET_MQ_send (op->set->cs->mq,
ev);
}
GNUNET_h2s (&ee->element_hash),
ee->element.size);
- if ( (op->generation_created < ee->generation_removed) &&
- (op->generation_created >= ee->generation_added) )
+ if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Reduced initialization, not starting with %s:%u (wrong generation)\n",
ev = GNUNET_MQ_msg (msg,
GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
- msg->request_id = htonl (op->spec->client_request_id);
+ msg->request_id = htonl (op->client_request_id);
msg->element_type = htons (0);
- GNUNET_MQ_send (op->spec->set->client_mq,
+ GNUNET_MQ_send (op->set->cs->mq,
ev);
_GSS_operation_destroy (op,
GNUNET_YES);
should use more bits to maximize its set reduction
potential and minimize overall bandwidth consumption. */
bf_elementbits = 2 + ceil (log2((double)
- (op->spec->remote_element_count /
- (double) op->state->my_element_count)));
+ (op->remote_element_count /
+ (double) op->state->my_element_count)));
if (bf_elementbits < 1)
bf_elementbits = 1; /* make sure k is not 0 */
/* optimize BF-size to ~50% of bits set */
ev = GNUNET_MQ_msg_extra (msg,
chunk_size,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
- memcpy (&msg[1],
+ GNUNET_memcpy (&msg[1],
&bf_data[offset],
chunk_size);
offset += chunk_size;
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Intersection succeeded, sending DONE to local client\n");
ev = GNUNET_MQ_msg (rm,
GNUNET_MESSAGE_TYPE_SET_RESULT);
- rm->request_id = htonl (op->spec->client_request_id);
+ rm->request_id = htonl (op->client_request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
rm->element_type = htons (0);
- GNUNET_MQ_send (op->spec->set->client_mq,
+ GNUNET_MQ_send (op->set->cs->mq,
ev);
_GSS_operation_destroy (op,
GNUNET_YES);
}
+/**
+ * Remember that we are done dealing with the local client
+ * AND have sent the other peer our message that we are done,
+ * so we are not just waiting for the channel to die before
+ * telling the local client that we are done as our last act.
+ *
+ * @param cls the `struct Operation`.
+ */
+static void
+finished_local_operations (void *cls)
+{
+ struct Operation *op = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "DONE sent to other peer, now waiting for other end to close the channel\n");
+ op->state->phase = PHASE_FINISHED;
+ op->state->channel_death_expected = GNUNET_YES;
+}
+
+
+/**
+ * Notify the other peer that we are done. Once this message
+ * is out, we still need to notify the local client that we
+ * are done.
+ *
+ * @param op operation to notify for.
+ */
+static void
+send_p2p_done (struct Operation *op)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct IntersectionDoneMessage *idm;
+
+ GNUNET_assert (PHASE_MUST_SEND_DONE == op->state->phase);
+ GNUNET_assert (GNUNET_NO == op->state->channel_death_expected);
+ ev = GNUNET_MQ_msg (idm,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
+ idm->final_element_count = htonl (op->state->my_element_count);
+ idm->element_xor_hash = op->state->my_xor;
+ GNUNET_MQ_notify_sent (ev,
+ &finished_local_operations,
+ op);
+ GNUNET_MQ_send (op->mq,
+ ev);
+}
+
+
/**
* Send all elements in the full result iterator.
*
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending done and destroy because iterator ran out\n");
- send_client_done_and_destroy (op);
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
+ op->state->full_result_iter = NULL;
+ if (PHASE_DONE_RECEIVED == op->state->phase)
+ {
+ op->state->phase = PHASE_FINISHED;
+ send_client_done_and_destroy (op);
+ }
+ else if (PHASE_MUST_SEND_DONE == op->state->phase)
+ {
+ send_p2p_done (op);
+ }
+ else
+ {
+ GNUNET_assert (0);
+ }
return;
}
ee = nxt;
element = &ee->element;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending element (size %u) to client (full set)\n",
+ "Sending element %s:%u to client (full set)\n",
+ GNUNET_h2s (&ee->element_hash),
element->size);
- GNUNET_assert (0 != op->spec->client_request_id);
+ GNUNET_assert (0 != op->client_request_id);
ev = GNUNET_MQ_msg_extra (rm,
element->size,
GNUNET_MESSAGE_TYPE_SET_RESULT);
GNUNET_assert (NULL != ev);
rm->result_status = htons (GNUNET_SET_STATUS_OK);
- rm->request_id = htonl (op->spec->client_request_id);
+ rm->request_id = htonl (op->client_request_id);
rm->element_type = element->element_type;
- memcpy (&rm[1],
- element->data,
- element->size);
+ GNUNET_memcpy (&rm[1],
+ element->data,
+ element->size);
GNUNET_MQ_notify_sent (ev,
&send_remaining_elements,
op);
- GNUNET_MQ_send (op->spec->set->client_mq,
+ GNUNET_MQ_send (op->set->cs->mq,
ev);
}
/**
- * Inform the peer that this operation is complete.
+ * Fills the "my_elements" hashmap with the initial set of
+ * (non-deleted) elements from the set of the specification.
*
- * @param op the intersection operation to fail
+ * @param cls closure with the `struct Operation *`
+ * @param key current key code for the element
+ * @param value value in the hash map with the `struct ElementEntry *`
+ * @return #GNUNET_YES (we should continue to iterate)
+ */
+static int
+initialize_map_unfiltered (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct ElementEntry *ee = value;
+ struct Operation *op = cls;
+
+ if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+ return GNUNET_YES; /* element not live in operation's generation */
+ GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
+ &ee->element_hash,
+ &op->state->my_xor);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Initial full initialization of my_elements, adding %s:%u\n",
+ GNUNET_h2s (&ee->element_hash),
+ ee->element.size);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
+ &ee->element_hash,
+ ee,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ return GNUNET_YES;
+}
+
+
+/**
+ * Send our element count to the peer, in case our element count is
+ * lower than his.
+ *
+ * @param op intersection operation
*/
static void
-send_peer_done (struct Operation *op)
+send_element_count (struct Operation *op)
{
struct GNUNET_MQ_Envelope *ev;
- struct IntersectionDoneMessage *idm;
+ struct IntersectionElementInfoMessage *msg;
- op->state->phase = PHASE_FINISHED;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Intersection succeeded, sending DONE\n");
- GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
- op->state->local_bf = NULL;
+ "Sending our element count (%u)\n",
+ op->state->my_element_count);
+ ev = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
+ msg->sender_element_count = htonl (op->state->my_element_count);
+ GNUNET_MQ_send (op->mq, ev);
+}
- ev = GNUNET_MQ_msg (idm,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE);
- idm->final_element_count = htonl (op->state->my_element_count);
- idm->element_xor_hash = op->state->my_xor;
- GNUNET_MQ_send (op->mq,
- ev);
+
+/**
+ * We go first, initialize our map with all elements and
+ * send the first Bloom filter.
+ *
+ * @param op operation to start exchange for
+ */
+static void
+begin_bf_exchange (struct Operation *op)
+{
+ op->state->phase = PHASE_BF_EXCHANGE;
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
+ &initialize_map_unfiltered,
+ op);
+ send_bloomfilter (op);
+}
+
+
+/**
+ * Handle the initial `struct IntersectionElementInfoMessage` from a
+ * remote peer.
+ *
+ * @param cls the intersection operation
+ * @param mh the header of the message
+ */
+void
+handle_intersection_p2p_element_info (void *cls,
+ const struct IntersectionElementInfoMessage *msg)
+{
+ struct Operation *op = cls;
+
+ if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
+ {
+ GNUNET_break_op (0);
+ fail_intersection_operation(op);
+ return;
+ }
+ op->remote_element_count = ntohl (msg->sender_element_count);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received remote element count (%u), I have %u\n",
+ op->remote_element_count,
+ op->state->my_element_count);
+ if ( ( (PHASE_INITIAL != op->state->phase) &&
+ (PHASE_COUNT_SENT != op->state->phase) ) ||
+ (op->state->my_element_count > op->remote_element_count) ||
+ (0 == op->state->my_element_count) ||
+ (0 == op->remote_element_count) )
+ {
+ GNUNET_break_op (0);
+ fail_intersection_operation(op);
+ return;
+ }
+ GNUNET_break (NULL == op->state->remote_bf);
+ begin_bf_exchange (op);
+ GNUNET_CADET_receive_done (op->channel);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
op->state->phase,
- op->spec->remote_element_count,
+ op->remote_element_count,
op->state->my_element_count,
- GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements));
+ GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
switch (op->state->phase)
{
case PHASE_INITIAL:
case PHASE_COUNT_SENT:
/* This is the first BF being sent, build our initial map with
filtering in place */
- op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
- GNUNET_YES);
op->state->my_element_count = 0;
- GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
+ GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
&filtered_map_initialization,
op);
break;
&iterator_bf_reduce,
op);
break;
+ case PHASE_MUST_SEND_DONE:
+ GNUNET_break_op (0);
+ fail_intersection_operation(op);
+ return;
+ case PHASE_DONE_RECEIVED:
+ GNUNET_break_op (0);
+ fail_intersection_operation(op);
+ return;
case PHASE_FINISHED:
GNUNET_break_op (0);
fail_intersection_operation(op);
op->state->remote_bf = NULL;
if ( (0 == op->state->my_element_count) || /* fully disjoint */
- ( (op->state->my_element_count == op->spec->remote_element_count) &&
+ ( (op->state->my_element_count == op->remote_element_count) &&
(0 == memcmp (&op->state->my_xor,
&op->state->other_xor,
sizeof (struct GNUNET_HashCode))) ) )
{
/* we are done */
- send_peer_done (op);
+ op->state->phase = PHASE_MUST_SEND_DONE;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Intersection succeeded, sending DONE to other peer\n");
+ GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
+ op->state->local_bf = NULL;
+ if (GNUNET_SET_RESULT_FULL == op->result_mode)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending full result set (%u elements)\n",
+ GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
+ op->state->full_result_iter
+ = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
+ send_remaining_elements (op);
+ return;
+ }
+ send_p2p_done (op);
return;
}
op->state->phase = PHASE_BF_EXCHANGE;
}
+/**
+ * Check an BF message from a remote peer.
+ *
+ * @param cls the intersection operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+int
+check_intersection_p2p_bf (void *cls,
+ const struct BFMessage *msg)
+{
+ struct Operation *op = cls;
+
+ if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
/**
* Handle an BF message from a remote peer.
*
* @param cls the intersection operation
- * @param mh the header of the message
+ * @param msg the header of the message
*/
-static void
-handle_p2p_bf (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_bf (void *cls,
+ const struct BFMessage *msg)
{
struct Operation *op = cls;
- const struct BFMessage *msg;
uint32_t bf_size;
uint32_t chunk_size;
uint32_t bf_bits_per_element;
- uint16_t msize;
- msize = htons (mh->size);
- if (msize < sizeof (struct BFMessage))
- {
- GNUNET_break_op (0);
- fail_intersection_operation (op);
- return;
- }
- msg = (const struct BFMessage *) mh;
switch (op->state->phase)
{
case PHASE_INITIAL:
GNUNET_break_op (0);
fail_intersection_operation (op);
- break;
+ return;
case PHASE_COUNT_SENT:
case PHASE_BF_EXCHANGE:
bf_size = ntohl (msg->bloomfilter_total_length);
bf_bits_per_element = ntohl (msg->bits_per_element);
- chunk_size = msize - sizeof (struct BFMessage);
+ chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
op->state->other_xor = msg->element_xor_hash;
if (bf_size == chunk_size)
{
bf_size,
bf_bits_per_element);
op->state->salt = ntohl (msg->sender_mutator);
- op->spec->remote_element_count = ntohl (msg->sender_element_count);
+ op->remote_element_count = ntohl (msg->sender_element_count);
process_bf (op);
- return;
+ break;
}
/* multipart chunk */
if (NULL == op->state->bf_data)
op->state->bf_bits_per_element = bf_bits_per_element;
op->state->bf_data_offset = 0;
op->state->salt = ntohl (msg->sender_mutator);
- op->spec->remote_element_count = ntohl (msg->sender_element_count);
+ op->remote_element_count = ntohl (msg->sender_element_count);
}
else
{
(op->state->bf_bits_per_element != bf_bits_per_element) ||
(op->state->bf_data_offset + chunk_size > bf_size) ||
(op->state->salt != ntohl (msg->sender_mutator)) ||
- (op->spec->remote_element_count != ntohl (msg->sender_element_count)) )
+ (op->remote_element_count != ntohl (msg->sender_element_count)) )
{
GNUNET_break_op (0);
fail_intersection_operation (op);
return;
}
}
- memcpy (&op->state->bf_data[op->state->bf_data_offset],
+ GNUNET_memcpy (&op->state->bf_data[op->state->bf_data_offset],
(const char*) &msg[1],
chunk_size);
op->state->bf_data_offset += chunk_size;
default:
GNUNET_break_op (0);
fail_intersection_operation (op);
- break;
- }
-}
-
-
-/**
- * Fills the "my_elements" hashmap with the initial set of
- * (non-deleted) elements from the set of the specification.
- *
- * @param cls closure with the `struct Operation *`
- * @param key current key code for the element
- * @param value value in the hash map with the `struct ElementEntry *`
- * @return #GNUNET_YES (we should continue to iterate)
- */
-static int
-initialize_map_unfiltered (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct ElementEntry *ee = value;
- struct Operation *op = cls;
-
- if ( (op->generation_created < ee->generation_removed) &&
- (op->generation_created >= ee->generation_added) )
- return GNUNET_YES; /* element not live in operation's generation */
- GNUNET_CRYPTO_hash_xor (&op->state->my_xor,
- &ee->element_hash,
- &op->state->my_xor);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Initial full initialization of my_elements, adding %s:%u\n",
- GNUNET_h2s (&ee->element_hash),
- ee->element.size);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
- &ee->element_hash,
- ee,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- return GNUNET_YES;
-}
-
-
-/**
- * Send our element count to the peer, in case our element count is
- * lower than his.
- *
- * @param op intersection operation
- */
-static void
-send_element_count (struct Operation *op)
-{
- struct GNUNET_MQ_Envelope *ev;
- struct IntersectionElementInfoMessage *msg;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending our element count (%u)\n",
- op->state->my_element_count);
- ev = GNUNET_MQ_msg (msg,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
- msg->sender_element_count = htonl (op->state->my_element_count);
- GNUNET_MQ_send (op->mq, ev);
-}
-
-
-/**
- * We go first, initialize our map with all elements and
- * send the first Bloom filter.
- *
- * @param op operation to start exchange for
- */
-static void
-begin_bf_exchange (struct Operation *op)
-{
- op->state->phase = PHASE_BF_EXCHANGE;
- op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
- GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
- &initialize_map_unfiltered,
- op);
- send_bloomfilter (op);
-}
-
-
-/**
- * Handle the initial `struct IntersectionElementInfoMessage` from a
- * remote peer.
- *
- * @param cls the intersection operation
- * @param mh the header of the message
- */
-static void
-handle_p2p_element_info (void *cls,
- const struct GNUNET_MessageHeader *mh)
-{
- struct Operation *op = cls;
- const struct IntersectionElementInfoMessage *msg;
-
- if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
- {
- GNUNET_break_op (0);
- fail_intersection_operation(op);
- return;
- }
- msg = (const struct IntersectionElementInfoMessage *) mh;
- op->spec->remote_element_count = ntohl (msg->sender_element_count);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received remote element count (%u), I have %u\n",
- op->spec->remote_element_count,
- op->state->my_element_count);
- if ( ( (PHASE_INITIAL != op->state->phase) &&
- (PHASE_COUNT_SENT != op->state->phase) ) ||
- (op->state->my_element_count > op->spec->remote_element_count) ||
- (0 == op->state->my_element_count) ||
- (0 == op->spec->remote_element_count) )
- {
- GNUNET_break_op (0);
- fail_intersection_operation(op);
return;
}
- GNUNET_break (NULL == op->state->remote_bf);
- begin_bf_exchange (op);
-}
-
-
-/**
- * Send a result message to the client indicating that the operation
- * is over. After the result done message has been sent to the
- * client, destroy the evaluate operation.
- *
- * @param op intersection operation
- */
-static void
-finish_and_destroy (struct Operation *op)
-{
- GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
-
- if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending full result set\n");
- op->state->full_result_iter
- = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
- send_remaining_elements (op);
- return;
- }
- send_client_done_and_destroy (op);
+ GNUNET_CADET_receive_done (op->channel);
}
* @param cls the intersection operation
* @param mh the message
*/
-static void
-handle_p2p_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_done (void *cls,
+ const struct IntersectionDoneMessage *idm)
{
struct Operation *op = cls;
- const struct IntersectionDoneMessage *idm;
- if (PHASE_BF_EXCHANGE != op->state->phase)
+ if (GNUNET_SET_OPERATION_INTERSECTION != op->set->operation)
{
- /* wrong phase to conclude? FIXME: Or should we allow this
- if the other peer has _initially_ already an empty set? */
GNUNET_break_op (0);
fail_intersection_operation (op);
return;
}
- if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
+ if (PHASE_BF_EXCHANGE != op->state->phase)
{
+ /* wrong phase to conclude? FIXME: Or should we allow this
+ if the other peer has _initially_ already an empty set? */
GNUNET_break_op (0);
fail_intersection_operation (op);
return;
}
- idm = (const struct IntersectionDoneMessage *) mh;
if (0 == ntohl (idm->final_element_count))
{
/* other peer determined empty set is the intersection,
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got final DONE\n");
+ "Got IntersectionDoneMessage, have %u elements in intersection\n",
+ op->state->my_element_count);
+ op->state->phase = PHASE_DONE_RECEIVED;
+ GNUNET_CADET_receive_done (op->channel);
+
+ GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
+ if (GNUNET_SET_RESULT_FULL == op->result_mode)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending full result set to client (%u elements)\n",
+ GNUNET_CONTAINER_multihashmap_size (op->state->my_elements));
+ op->state->full_result_iter
+ = GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
+ send_remaining_elements (op);
+ return;
+ }
op->state->phase = PHASE_FINISHED;
- finish_and_destroy (op);
+ send_client_done_and_destroy (op);
}
* begin the evaluation
* @param opaque_context message to be transmitted to the listener
* to convince him to accept, may be NULL
+ * @return operation-specific state to keep in @a op
*/
-static void
+static struct OperationState *
intersection_evaluate (struct Operation *op,
const struct GNUNET_MessageHeader *opaque_context)
{
+ struct OperationState *state;
struct GNUNET_MQ_Envelope *ev;
struct OperationRequestMessage *msg;
- op->state = GNUNET_new (struct OperationState);
- /* we started the operation, thus we have to send the operation request */
- op->state->phase = PHASE_INITIAL;
- op->state->my_element_count = op->spec->set->state->current_set_element_count;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Initiating intersection operation evaluation\n");
ev = GNUNET_MQ_msg_nested_mh (msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
opaque_context);
{
/* the context message is too large!? */
GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (op->spec->set->client);
- return;
+ return NULL;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Initiating intersection operation evaluation\n");
+ state = GNUNET_new (struct OperationState);
+ /* we started the operation, thus we have to send the operation request */
+ state->phase = PHASE_INITIAL;
+ state->my_element_count = op->set->state->current_set_element_count;
+ state->my_elements
+ = GNUNET_CONTAINER_multihashmap_create (state->my_element_count,
+ GNUNET_YES);
+
msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
- msg->app_id = op->spec->app_id;
- msg->element_count = htonl (op->state->my_element_count);
+ msg->element_count = htonl (state->my_element_count);
GNUNET_MQ_send (op->mq,
ev);
+ state->phase = PHASE_COUNT_SENT;
if (NULL != opaque_context)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sent op request with context message\n");
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sent op request without context message\n");
+ return state;
}
*
* @param op operation that will be accepted as an intersection operation
*/
-static void
+static struct OperationState *
intersection_accept (struct Operation *op)
{
+ struct OperationState *state;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Accepting set intersection operation\n");
- op->state = GNUNET_new (struct OperationState);
- op->state->phase = PHASE_INITIAL;
- op->state->my_element_count
- = op->spec->set->state->current_set_element_count;
- op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create
- (GNUNET_MIN (op->state->my_element_count,
- op->spec->remote_element_count),
- GNUNET_YES);
- if (op->spec->remote_element_count < op->state->my_element_count)
+ state = GNUNET_new (struct OperationState);
+ state->phase = PHASE_INITIAL;
+ state->my_element_count
+ = op->set->state->current_set_element_count;
+ state->my_elements
+ = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (state->my_element_count,
+ op->remote_element_count),
+ GNUNET_YES);
+ op->state = state;
+ if (op->remote_element_count < state->my_element_count)
{
/* If the other peer (Alice) has fewer elements than us (Bob),
we just send the count as Alice should send the first BF */
send_element_count (op);
- op->state->phase = PHASE_COUNT_SENT;
- return;
+ state->phase = PHASE_COUNT_SENT;
+ return state;
}
/* We have fewer elements, so we start with the BF */
begin_bf_exchange (op);
-}
-
-
-/**
- * Dispatch messages for a intersection operation.
- *
- * @param op the state of the intersection evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
- */
-static int
-intersection_handle_p2p_message (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received p2p message (t: %u, s: %u)\n",
- ntohs (mh->type), ntohs (mh->size));
- switch (ntohs (mh->type))
- {
- /* this message handler is not active until after we received an
- * operation request message, thus the ops request is not handled here
- */
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
- handle_p2p_element_info (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
- handle_p2p_bf (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
- handle_p2p_done (op, mh);
- break;
- default:
- /* something wrong with cadet's message handlers? */
- GNUNET_assert (0);
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Handler for peer-disconnects, notifies the client about the aborted
- * operation. If we did not expect anything from the other peer, we
- * gracefully terminate the operation.
- *
- * @param op the destroyed operation
- */
-static void
-intersection_peer_disconnect (struct Operation *op)
-{
- if (PHASE_FINISHED != op->state->phase)
- {
- fail_intersection_operation (op);
- return;
- }
- /* the session has already been concluded */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Other peer disconnected (finished)\n");
- if (GNUNET_NO == op->state->client_done_sent)
- finish_and_destroy (op);
+ return state;
}
GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
op->state->my_elements = NULL;
}
+ if (NULL != op->state->full_result_iter)
+ {
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
+ op->state->full_result_iter = NULL;
+ }
GNUNET_free (op->state);
op->state = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
}
+/**
+ * Callback for channel death for the intersection operation.
+ *
+ * @param op operation that lost the channel
+ */
+static void
+intersection_channel_death (struct Operation *op)
+{
+ if (GNUNET_YES == op->state->channel_death_expected)
+ {
+ /* oh goodie, we are done! */
+ send_client_done_and_destroy (op);
+ }
+ else
+ {
+ /* sorry, channel went down early, too bad. */
+ _GSS_operation_destroy (op,
+ GNUNET_YES);
+ }
+}
+
+
/**
* Get the table with implementing functions for set intersection.
*
{
static const struct SetVT intersection_vt = {
.create = &intersection_set_create,
- .msg_handler = &intersection_handle_p2p_message,
.add = &intersection_add,
.remove = &intersection_remove,
.destroy_set = &intersection_set_destroy,
.evaluate = &intersection_evaluate,
.accept = &intersection_accept,
- .peer_disconnect = &intersection_peer_disconnect,
.cancel = &intersection_op_cancel,
+ .channel_death = &intersection_channel_death,
};
return &intersection_vt;