* @author Christian Grothoff
*/
#include "gnunet-service-set.h"
+#include "gnunet-service-set_union.h"
+#include "gnunet-service-set_intersection.h"
#include "gnunet-service-set_protocol.h"
#include "gnunet_statistics_service.h"
op->channel = NULL;
GNUNET_CADET_channel_destroy (channel);
}
+
if (GNUNET_YES == gc)
collect_generation_garbage (set);
/* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
{
struct Operation *curr = op;
op = op->next;
- if ( (GNUNET_YES == curr->is_incoming) &&
+ if ( (GNUNET_YES == curr->is_incoming) &&
(curr->listener == listener) )
incoming_destroy (curr);
}
}
+/**
+ * Check a request for a set operation from another peer.
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ * #GNUNET_SYSERR to destroy the channel
+ */
+static int
+check_incoming_msg (void *cls,
+ const struct OperationRequestMessage *msg)
+{
+ struct Operation *op = cls;
+ const struct GNUNET_MessageHeader *nested_context;
+
+ /* double operation request */
+ if (NULL != op->spec)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ nested_context = GNUNET_MQ_extract_nested_mh (msg);
+ if ( (NULL != nested_context) &&
+ (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
/**
* Handle a request for a set operation from another peer. Checks if we
* have a listener waiting for such a request (and in that case initiates
* our virtual table and subsequent msgs would be routed differently (as
* we then know what type of operation this is).
*
- * @param op the operation state
- * @param mh the received message
+ * @param cls the operation state
+ * @param msg the received message
* @return #GNUNET_OK if the channel should be kept alive,
* #GNUNET_SYSERR to destroy the channel
*/
-static int
-handle_incoming_msg (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
+static void
+handle_incoming_msg (void *cls,
+ const struct OperationRequestMessage *msg)
{
- const struct OperationRequestMessage *msg;
+ struct Operation *op = cls;
struct Listener *listener = op->listener;
struct OperationSpecification *spec;
const struct GNUNET_MessageHeader *nested_context;
- msg = (const struct OperationRequestMessage *) mh;
GNUNET_assert (GNUNET_YES == op->is_incoming);
- if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
- /* double operation request */
- if (NULL != op->spec)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
spec = GNUNET_new (struct OperationSpecification);
nested_context = GNUNET_MQ_extract_nested_mh (msg);
- if ( (NULL != nested_context) &&
- (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
- {
- GNUNET_break_op (0);
- GNUNET_free (spec);
- return GNUNET_SYSERR;
- }
/* Make a copy of the nested_context (application-specific context
information that is opaque to set) so we can pass it to the
listener later on */
spec->peer = op->peer;
spec->remote_element_count = ntohl (msg->element_count);
op->spec = spec;
-
listener = op->listener;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received P2P operation request (op %u, port %s) for active listener\n",
GNUNET_h2s (&listener->app_id));
incoming_suggest (op,
listener);
- return GNUNET_OK;
}
{
case GNUNET_SET_OPERATION_INTERSECTION:
set->vt = _GSS_intersection_vt ();
+ set->type = OT_INTERSECTION;
break;
case GNUNET_SET_OPERATION_UNION:
set->vt = _GSS_union_vt ();
+ set->type = OT_UNION;
break;
default:
GNUNET_free (set);
const struct GNUNET_PeerIdentity *source)
{
static const struct SetVT incoming_vt = {
- .msg_handler = &handle_incoming_msg,
.peer_disconnect = &handle_incoming_disconnect
};
struct Listener *listener = cls;
/* FIXME: not implemented, we could do flow control here... */
}
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static int
-check_p2p_message (void *cls,
- const struct GNUNET_MessageHeader *message)
-{
- return GNUNET_OK;
-}
-
-
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * Functions with this signature are called whenever a message is
- * received via a cadet channel.
- *
- * The msg_handler is a virtual table set in initially either when a peer
- * creates a new channel with us, or once we create a new channel
- * ourselves (evaluate).
- *
- * Once we know the exact type of operation (union/intersection), the vt is
- * replaced with an operation specific instance (_GSS_[op]_vt).
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static void
-handle_p2p_message (void *cls,
- const struct GNUNET_MessageHeader *message)
-{
- struct Operation *op = cls;
- int ret;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Dispatching cadet message (type: %u)\n",
- ntohs (message->type));
- /* do this before the handler, as the handler might kill the channel */
- GNUNET_CADET_receive_done (op->channel);
- if (NULL != op->vt)
- ret = op->vt->msg_handler (op,
- message);
- else
- ret = GNUNET_SYSERR;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Handled cadet message (type: %u)\n",
- ntohs (message->type));
- if (GNUNET_OK != ret)
- GNUNET_CADET_channel_destroy (op->channel);
-}
-
/**
* Called when a client wants to create a new listener.
{
struct GNUNET_SERVICE_Client *client = cls;
struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (incoming_msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- struct GNUNET_MessageHeader,
+ struct OperationRequestMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_ibf,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
- struct GNUNET_MessageHeader,
+ struct IBFMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_elements,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_offer,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
struct GNUNET_MessageHeader,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_inquiry,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
- struct GNUNET_MessageHeader,
+ struct InquiryMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_demand,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
struct GNUNET_MessageHeader,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (union_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_full_element,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+ struct IntersectionElementInfoMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (intersection_p2p_bf,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
- struct GNUNET_MessageHeader,
+ struct BFMessage,
NULL),
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+ struct IntersectionDoneMessage,
+ NULL),
GNUNET_MQ_handler_end ()
};
struct Listener *listener;
struct GNUNET_SERVICE_Client *client = cls;
struct Operation *op = GNUNET_new (struct Operation);
const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (incoming_msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- struct GNUNET_MessageHeader,
+ struct OperationRequestMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_ibf,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
- struct GNUNET_MessageHeader,
+ struct IBFMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_elements,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_offer,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
struct GNUNET_MessageHeader,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_inquiry,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
- struct GNUNET_MessageHeader,
+ struct InquiryMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_demand,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
struct GNUNET_MessageHeader,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (union_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+ GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+ struct GNUNET_MessageHeader,
+ op),
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
- struct GNUNET_MessageHeader,
+ struct StrataEstimatorMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_var_size (union_p2p_full_element,
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
- struct GNUNET_MessageHeader,
+ struct GNUNET_SET_ElementMessage,
op),
- GNUNET_MQ_hd_var_size (p2p_message,
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+ struct IntersectionElementInfoMessage,
+ op),
+ GNUNET_MQ_hd_var_size (intersection_p2p_bf,
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
- struct GNUNET_MessageHeader,
- op),
- GNUNET_MQ_hd_var_size (p2p_message,
- GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
- struct GNUNET_MessageHeader,
+ struct BFMessage,
op),
+ GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+ GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+ struct IntersectionDoneMessage,
+ op),
GNUNET_MQ_handler_end ()
};
struct Set *set;
// mutations won't interfer with the running operation.
op->generation_created = set->current_generation;
advance_generation (set);
-
+ op->type = set->type;
op->vt = set->vt;
GNUNET_CONTAINER_DLL_insert (set->ops_head,
set->ops_tail,
{
case GNUNET_SET_OPERATION_INTERSECTION:
set->vt = _GSS_intersection_vt ();
+ set->type = OT_INTERSECTION;
break;
case GNUNET_SET_OPERATION_UNION:
set->vt = _GSS_union_vt ();
+ set->type = OT_UNION;
break;
default:
GNUNET_assert (0);
advance_generation (set);
op->vt = set->vt;
+ op->type = set->type;
op->vt->accept (op);
GNUNET_SERVICE_client_continue (client);
}
/*
This file is part of GNUnet
- Copyright (C) 2013, 2014 GNUnet e.V.
+ Copyright (C) 2013-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "gnunet-service-set.h"
#include "gnunet_block_lib.h"
#include "gnunet-service-set_protocol.h"
+#include "gnunet-service-set_intersection.h"
#include <gcrypt.h>
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending done and destroy because iterator ran out\n");
op->keep--;
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
+ op->state->full_result_iter = NULL;
send_client_done_and_destroy (op);
return;
}
case PHASE_COUNT_SENT:
/* This is the first BF being sent, build our initial map with
filtering in place */
- op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
- GNUNET_YES);
op->state->my_element_count = 0;
GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
&filtered_map_initialization,
}
+/**
+ * Check an BF message from a remote peer.
+ *
+ * @param cls the intersection operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+int
+check_intersection_p2p_bf (void *cls,
+ const struct BFMessage *msg)
+{
+ struct Operation *op = cls;
+
+ if (OT_INTERSECTION != op->type)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
/**
* Handle an BF message from a remote peer.
*
* @param cls the intersection operation
- * @param mh the header of the message
+ * @param msg the header of the message
*/
-static void
-handle_p2p_bf (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_bf (void *cls,
+ const struct BFMessage *msg)
{
struct Operation *op = cls;
- const struct BFMessage *msg;
uint32_t bf_size;
uint32_t chunk_size;
uint32_t bf_bits_per_element;
- uint16_t msize;
- msize = htons (mh->size);
- if (msize < sizeof (struct BFMessage))
- {
- GNUNET_break_op (0);
- fail_intersection_operation (op);
- return;
- }
- msg = (const struct BFMessage *) mh;
switch (op->state->phase)
{
case PHASE_INITIAL:
GNUNET_break_op (0);
fail_intersection_operation (op);
- break;
+ return;
case PHASE_COUNT_SENT:
case PHASE_BF_EXCHANGE:
bf_size = ntohl (msg->bloomfilter_total_length);
bf_bits_per_element = ntohl (msg->bits_per_element);
- chunk_size = msize - sizeof (struct BFMessage);
+ chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
op->state->other_xor = msg->element_xor_hash;
if (bf_size == chunk_size)
{
op->state->salt = ntohl (msg->sender_mutator);
op->spec->remote_element_count = ntohl (msg->sender_element_count);
process_bf (op);
- return;
+ break;
}
/* multipart chunk */
if (NULL == op->state->bf_data)
default:
GNUNET_break_op (0);
fail_intersection_operation (op);
- break;
+ return;
}
+ GNUNET_CADET_receive_done (op->channel);
}
begin_bf_exchange (struct Operation *op)
{
op->state->phase = PHASE_BF_EXCHANGE;
+ GNUNET_assert (NULL == op->state->my_elements);
op->state->my_elements
= GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
GNUNET_YES);
* @param cls the intersection operation
* @param mh the header of the message
*/
-static void
-handle_p2p_element_info (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_element_info (void *cls,
+ const struct IntersectionElementInfoMessage *msg)
{
struct Operation *op = cls;
- const struct IntersectionElementInfoMessage *msg;
- if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
+ if (OT_INTERSECTION != op->type)
{
GNUNET_break_op (0);
fail_intersection_operation(op);
return;
}
- msg = (const struct IntersectionElementInfoMessage *) mh;
op->spec->remote_element_count = ntohl (msg->sender_element_count);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received remote element count (%u), I have %u\n",
}
GNUNET_break (NULL == op->state->remote_bf);
begin_bf_exchange (op);
+ GNUNET_CADET_receive_done (op->channel);
}
* @param cls the intersection operation
* @param mh the message
*/
-static void
-handle_p2p_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_done (void *cls,
+ const struct IntersectionDoneMessage *idm)
{
struct Operation *op = cls;
- const struct IntersectionDoneMessage *idm;
- if (PHASE_BF_EXCHANGE != op->state->phase)
+ if (OT_INTERSECTION != op->type)
{
- /* wrong phase to conclude? FIXME: Or should we allow this
- if the other peer has _initially_ already an empty set? */
GNUNET_break_op (0);
- fail_intersection_operation (op);
+ fail_intersection_operation(op);
return;
}
- if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
+ if (PHASE_BF_EXCHANGE != op->state->phase)
{
+ /* wrong phase to conclude? FIXME: Or should we allow this
+ if the other peer has _initially_ already an empty set? */
GNUNET_break_op (0);
fail_intersection_operation (op);
return;
}
- idm = (const struct IntersectionDoneMessage *) mh;
if (0 == ntohl (idm->final_element_count))
{
/* other peer determined empty set is the intersection,
op->state->my_element_count);
op->state->phase = PHASE_FINISHED;
finish_and_destroy (op);
+ GNUNET_CADET_receive_done (op->channel);
}
op->state->phase = PHASE_INITIAL;
op->state->my_element_count
= op->spec->set->state->current_set_element_count;
+ GNUNET_assert (NULL == op->state->my_elements);
op->state->my_elements
- = GNUNET_CONTAINER_multihashmap_create
- (GNUNET_MIN (op->state->my_element_count,
- op->spec->remote_element_count),
- GNUNET_YES);
+ = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count,
+ op->spec->remote_element_count),
+ GNUNET_YES);
if (op->spec->remote_element_count < op->state->my_element_count)
{
/* If the other peer (Alice) has fewer elements than us (Bob),
}
-/**
- * Dispatch messages for a intersection operation.
- *
- * @param op the state of the intersection evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
- */
-static int
-intersection_handle_p2p_message (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received p2p message (t: %u, s: %u)\n",
- ntohs (mh->type), ntohs (mh->size));
- switch (ntohs (mh->type))
- {
- /* this message handler is not active until after we received an
- * operation request message, thus the ops request is not handled here
- */
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
- handle_p2p_element_info (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
- handle_p2p_bf (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
- handle_p2p_done (op, mh);
- break;
- default:
- /* something wrong with cadet's message handlers? */
- GNUNET_assert (0);
- }
- return GNUNET_OK;
-}
-
-
/**
* Handler for peer-disconnects, notifies the client about the aborted
* operation. If we did not expect anything from the other peer, we
GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
op->state->my_elements = NULL;
}
+ if (NULL != op->state->full_result_iter)
+ {
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter);
+ op->state->full_result_iter = NULL;
+ }
GNUNET_free (op->state);
op->state = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
{
static const struct SetVT intersection_vt = {
.create = &intersection_set_create,
- .msg_handler = &intersection_handle_p2p_message,
.add = &intersection_add,
.remove = &intersection_remove,
.destroy_set = &intersection_set_destroy,
/*
This file is part of GNUnet
- Copyright (C) 2013-2016 GNUnet e.V.
+ Copyright (C) 2013-2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
*/
/**
* @file set/gnunet-service-set_union.c
-
* @brief two-peer set operations
* @author Florian Dold
+ * @author Christian Grothoff
*/
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_statistics_service.h"
#include "gnunet-service-set.h"
#include "ibf.h"
+#include "gnunet-service-set_union.h"
#include "gnunet-service-set_union_strata_estimator.h"
#include "gnunet-service-set_protocol.h"
#include <gcrypt.h>
* Handle a strata estimator from a remote peer
*
* @param cls the union operation
- * @param mh the message
- * @param is_compressed #GNUNET_YES if the estimator is compressed
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
+ * @param msg the message
*/
-static int
-handle_p2p_strata_estimator (void *cls,
- const struct GNUNET_MessageHeader *mh,
- int is_compressed)
+int
+check_union_p2p_strata_estimator (void *cls,
+ const struct StrataEstimatorMessage *msg)
{
struct Operation *op = cls;
- struct StrataEstimator *remote_se;
- struct StrataEstimatorMessage *msg = (void *) mh;
- unsigned int diff;
- uint64_t other_size;
+ int is_compressed;
size_t len;
- GNUNET_STATISTICS_update (_GSS_statistics,
- "# bytes of SE received",
- ntohs (mh->size),
- GNUNET_NO);
-
if (op->state->phase != PHASE_EXPECT_SE)
{
GNUNET_break (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
- len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
+ is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
+ len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
if ( (GNUNET_NO == is_compressed) &&
(len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
{
- fail_union_operation (op);
GNUNET_break (0);
return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_strata_estimator (void *cls,
+ const struct StrataEstimatorMessage *msg)
+{
+ struct Operation *op = cls;
+ struct StrataEstimator *remote_se;
+ unsigned int diff;
+ uint64_t other_size;
+ size_t len;
+ int is_compressed;
+
+ is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type));
+ GNUNET_STATISTICS_update (_GSS_statistics,
+ "# bytes of SE received",
+ ntohs (msg->header.size),
+ GNUNET_NO);
+ len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
other_size = GNUNET_ntohll (msg->set_size);
remote_se = strata_estimator_create (SE_STRATA_COUNT,
SE_IBF_SIZE,
{
/* insufficient resources, fail */
fail_union_operation (op);
- return GNUNET_SYSERR;
+ return;
}
if (GNUNET_OK !=
strata_estimator_read (&msg[1],
remote_se))
{
/* decompression failed */
- fail_union_operation (op);
strata_estimator_destroy (remote_se);
- return GNUNET_SYSERR;
+ fail_union_operation (op);
+ return;
}
GNUNET_assert (NULL != op->state->se);
diff = strata_estimator_difference (remote_se,
op->state->se);
if (diff > 200)
- diff = diff * 3 / 2;
-
-
+ diff = diff * 3 / 2;
strata_estimator_destroy (remote_se);
strata_estimator_destroy (op->state->se);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got se diff=%d, using ibf size %d\n",
diff,
- 1<<get_order_from_difference (diff));
+ 1U << get_order_from_difference (diff));
{
char *set_debug;
+
set_debug = getenv ("GNUNET_SET_BENCHMARK");
- if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) )
+ if ( (NULL != set_debug) &&
+ (0 == strcmp (set_debug, "1")) )
{
FILE *f = fopen ("set.log", "a");
fprintf (f, "%llu\n", (unsigned long long) diff);
}
}
- if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
+ if ( (GNUNET_YES == op->spec->byzantine) &&
+ (other_size < op->spec->byzantine_lower_bound) )
{
GNUNET_break (0);
fail_union_operation (op);
- return GNUNET_SYSERR;
+ return;
}
-
- if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
+ if ( (GNUNET_YES == op->spec->force_full) ||
+ (diff > op->state->initial_size / 4))
{
LOG (GNUNET_ERROR_TYPE_INFO,
"Sending full set (diff=%d, own set=%u)\n",
else
{
struct GNUNET_MQ_Envelope *ev;
+
op->state->phase = PHASE_EXPECT_IBF;
ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
GNUNET_MQ_send (op->mq, ev);
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to send IBF, closing connection\n");
fail_union_operation (op);
- return GNUNET_SYSERR;
+ return;
}
}
-
- return GNUNET_OK;
+ GNUNET_CADET_receive_done (op->channel);
}
/**
- * Handle an IBF message from a remote peer.
+ * Check an IBF message from a remote peer.
*
* Reassemble the IBF from multiple pieces, and
* process the whole IBF once possible.
*
* @param cls the union operation
- * @param mh the header of the message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
*/
-static int
-handle_p2p_ibf (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_ibf (void *cls,
+ const struct IBFMessage *msg)
{
struct Operation *op = cls;
- const struct IBFMessage *msg;
unsigned int buckets_in_message;
- if (ntohs (mh->size) < sizeof (struct IBFMessage))
+ if (OT_UNION != op->type)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
- msg = (const struct IBFMessage *) mh;
- if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
- (op->state->phase == PHASE_EXPECT_IBF) )
+ buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
+ if (0 == buckets_in_message)
{
- op->state->phase = PHASE_EXPECT_IBF_CONT;
- GNUNET_assert (NULL == op->state->remote_ibf);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating new ibf of size %u\n",
- 1 << msg->order);
- op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
- op->state->salt_receive = ntohl (msg->salt);
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive);
- if (NULL == op->state->remote_ibf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to parse remote IBF, closing connection\n");
- fail_union_operation (op);
- return GNUNET_SYSERR;
- }
- op->state->ibf_buckets_received = 0;
- if (0 != ntohl (msg->offset))
- {
- GNUNET_break_op (0);
- fail_union_operation (op);
- return GNUNET_SYSERR;
- }
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
+ if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (op->state->phase == PHASE_EXPECT_IBF_CONT)
{
if (ntohl (msg->offset) != op->state->ibf_buckets_received)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
if (1<<msg->order != op->state->remote_ibf->size)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
if (ntohl (msg->salt) != op->state->salt_receive)
{
GNUNET_break_op (0);
- fail_union_operation (op);
return GNUNET_SYSERR;
}
}
- else
+ else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+ (op->state->phase != PHASE_EXPECT_IBF) )
{
- GNUNET_assert (0);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
+ return GNUNET_OK;
+}
- if (0 == buckets_in_message)
+
+/**
+ * Handle an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ */
+void
+handle_union_p2p_ibf (void *cls,
+ const struct IBFMessage *msg)
+{
+ struct Operation *op = cls;
+ unsigned int buckets_in_message;
+
+ buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
+ if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+ (op->state->phase == PHASE_EXPECT_IBF) )
{
- GNUNET_break_op (0);
- fail_union_operation (op);
- return GNUNET_SYSERR;
+ op->state->phase = PHASE_EXPECT_IBF_CONT;
+ GNUNET_assert (NULL == op->state->remote_ibf);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating new ibf of size %u\n",
+ 1 << msg->order);
+ op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+ op->state->salt_receive = ntohl (msg->salt);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Receiving new IBF with salt %u\n",
+ op->state->salt_receive);
+ if (NULL == op->state->remote_ibf)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to parse remote IBF, closing connection\n");
+ fail_union_operation (op);
+ return;
+ }
+ op->state->ibf_buckets_received = 0;
+ if (0 != ntohl (msg->offset))
+ {
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
}
-
- if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
+ else
{
- GNUNET_break_op (0);
- fail_union_operation (op);
- return GNUNET_SYSERR;
+ GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
}
-
GNUNET_assert (NULL != op->state->remote_ibf);
ibf_read_slice (&msg[1],
/* Internal error, best we can do is shut down */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to decode IBF, closing connection\n");
- return GNUNET_SYSERR;
+ fail_union_operation (op);
+ return;
}
}
- return GNUNET_OK;
+ GNUNET_CADET_receive_done (op->channel);
}
}
+/**
+ * Tests if the operation is finished, and if so notify.
+ *
+ * @param op operation to check
+ */
static void
maybe_finish (struct Operation *op)
{
/**
- * Handle an element message from a remote peer.
- * Sent by the other peer either because we decoded an IBF and placed a demand,
- * or because the other peer switched to full set transmission.
+ * Check an element message from a remote peer.
*
* @param cls the union operation
- * @param mh the message
+ * @param emsg the message
*/
-static void
-handle_p2p_elements (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_elements (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
{
struct Operation *op = cls;
- struct ElementEntry *ee;
- const struct GNUNET_SET_ElementMessage *emsg;
- uint16_t element_size;
- if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
+ if (OT_UNION != op->type)
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
- if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+ if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
- emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+/**
+ * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_elements (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct KeyEntry *ke;
+ uint16_t element_size;
- element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+ element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
- GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+ GNUNET_memcpy (&ee[1],
+ &emsg[1],
+ element_size);
ee->element.size = element_size;
ee->element.data = &ee[1];
ee->element.element_type = ntohs (emsg->element_type);
ee->remote = GNUNET_YES;
- GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
-
+ GNUNET_SET_element_hash (&ee->element,
+ &ee->element_hash);
if (GNUNET_NO ==
GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
&ee->element_hash,
{
/* We got something we didn't demand, since it's not in our map. */
GNUNET_break_op (0);
- GNUNET_free (ee);
fail_union_operation (op);
return;
}
1,
GNUNET_NO);
- op->state->received_total += 1;
-
- struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+ op->state->received_total++;
+ ke = op_get_element (op, &ee->element_hash);
if (NULL != ke)
{
/* Got repeated element. Should not happen since
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Registering new element from remote peer\n");
- op->state->received_fresh += 1;
+ op->state->received_fresh++;
op_register_element (op, ee, GNUNET_YES);
/* only send results immediately if the client wants it */
switch (op->spec->result_mode)
}
}
- if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
+ if ( (op->state->received_total > 8) &&
+ (op->state->received_fresh < op->state->received_total / 3) )
{
/* The other peer gave us lots of old elements, there's something wrong. */
GNUNET_break_op (0);
fail_union_operation (op);
return;
}
-
+ GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
}
/**
- * Handle an element message from a remote peer.
+ * Check a full element message from a remote peer.
*
* @param cls the union operation
- * @param mh the message
+ * @param emsg the message
*/
-static void
-handle_p2p_full_element (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_full_element (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
{
struct Operation *op = cls;
- struct ElementEntry *ee;
- const struct GNUNET_SET_ElementMessage *emsg;
- uint16_t element_size;
- if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+ if (OT_UNION != op->type)
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ // FIXME: check that we expect full elements here?
+ return GNUNET_OK;
+}
- emsg = (const struct GNUNET_SET_ElementMessage *) mh;
- element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_full_element (void *cls,
+ const struct GNUNET_SET_ElementMessage *emsg)
+{
+ struct Operation *op = cls;
+ struct ElementEntry *ee;
+ struct KeyEntry *ke;
+ uint16_t element_size;
+
+ element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage);
ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
GNUNET_memcpy (&ee[1], &emsg[1], element_size);
ee->element.size = element_size;
1,
GNUNET_NO);
- op->state->received_total += 1;
-
- struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+ op->state->received_total++;
+ ke = op_get_element (op, &ee->element_hash);
if (NULL != ke)
{
/* Got repeated element. Should not happen since
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Registering new element from remote peer\n");
- op->state->received_fresh += 1;
+ op->state->received_fresh++;
op_register_element (op, ee, GNUNET_YES);
/* only send results immediately if the client wants it */
switch (op->spec->result_mode)
}
}
- if ( (GNUNET_YES == op->spec->byzantine) &&
- (op->state->received_total > 384 + op->state->received_fresh * 4) &&
+ if ( (GNUNET_YES == op->spec->byzantine) &&
+ (op->state->received_total > 384 + op->state->received_fresh * 4) &&
(op->state->received_fresh < op->state->received_total / 6) )
{
/* The other peer gave us lots of old elements, there's something wrong. */
fail_union_operation (op);
return;
}
+ GNUNET_CADET_receive_done (op->channel);
}
+
/**
* Send offers (for GNUNET_Hash-es) in response
* to inquiries (for IBF_Key-s).
*
* @param cls the union operation
- * @param mh the message
+ * @param msg the message
*/
-static void
-handle_p2p_inquiry (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_inquiry (void *cls,
+ const struct InquiryMessage *msg)
{
struct Operation *op = cls;
- const struct IBF_Key *ibf_key;
unsigned int num_keys;
- struct InquiryMessage *msg;
- /* look up elements and send them */
+ if (OT_UNION != op->type)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
if (op->state->phase != PHASE_INVENTORY_PASSIVE)
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
- num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
- / sizeof (struct IBF_Key);
- if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
+ num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+ / sizeof (struct IBF_Key);
+ if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
!= num_keys * sizeof (struct IBF_Key))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
- msg = (struct InquiryMessage *) mh;
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_inquiry (void *cls,
+ const struct InquiryMessage *msg)
+{
+ struct Operation *op = cls;
+ const struct IBF_Key *ibf_key;
+ unsigned int num_keys;
+
+ num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+ / sizeof (struct IBF_Key);
ibf_key = (const struct IBF_Key *) &msg[1];
while (0 != num_keys--)
{
struct IBF_Key unsalted_key;
+
unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
send_offers_for_key (op, unsalted_key);
ibf_key++;
}
+ GNUNET_CADET_receive_done (op->channel);
}
/**
- * Handle a
+ * Handle a request for full set transmission.
*
* @parem cls closure, a set union operation
* @param mh the demand message
*/
-static void
-handle_p2p_request_full (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_request_full (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- if (PHASE_EXPECT_IBF != op->state->phase)
+ if (OT_UNION != op->type)
{
+ GNUNET_break_op (0);
fail_union_operation (op);
+ return;
+ }
+ if (PHASE_EXPECT_IBF != op->state->phase)
+ {
GNUNET_break_op (0);
+ fail_union_operation (op);
return;
}
// FIXME: we need to check that our set is larger than the
// byzantine_lower_bound by some threshold
send_full_set (op);
+ GNUNET_CADET_receive_done (op->channel);
}
* @parem cls closure, a set union operation
* @param mh the demand message
*/
-static void
-handle_p2p_full_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_full_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- if (PHASE_EXPECT_IBF == op->state->phase)
+ switch (op->state->phase)
{
- struct GNUNET_MQ_Envelope *ev;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
+ case PHASE_EXPECT_IBF:
+ {
+ struct GNUNET_MQ_Envelope *ev;
- /* send all the elements that did not come from the remote peer */
- GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
- &send_missing_elements_iter,
- op);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got FULL DONE, sending elements that other peer is missing\n");
- ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
- GNUNET_MQ_send (op->mq, ev);
- op->state->phase = PHASE_DONE;
+ /* send all the elements that did not come from the remote peer */
+ GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+ &send_missing_elements_iter,
+ op);
- /* we now wait until the other peer shuts the tunnel down*/
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+ GNUNET_MQ_send (op->mq, ev);
+ op->state->phase = PHASE_DONE;
+ /* we now wait until the other peer shuts the tunnel down*/
+ }
+ break;
+ case PHASE_FULL_SENDING:
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "got FULL DONE, finishing\n");
+ /* We sent the full set, and got the response for that. We're done. */
+ op->state->phase = PHASE_DONE;
+ GNUNET_CADET_receive_done (op->channel);
+ send_done_and_destroy (op);
+ return;
+ }
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Handle full done phase is %u\n",
+ (unsigned) op->state->phase);
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
}
- else if (PHASE_FULL_SENDING == op->state->phase)
+ GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+int
+check_union_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ unsigned int num_hashes;
+
+ if (OT_UNION != op->type)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
- /* We sent the full set, and got the response for that. We're done. */
- op->state->phase = PHASE_DONE;
- send_done_and_destroy (op);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
}
- else
+ num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ / sizeof (struct GNUNET_HashCode);
+ if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ != num_hashes * sizeof (struct GNUNET_HashCode))
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
}
/**
* Handle a demand by the other peer for elements based on a list
- * of GNUNET_HashCode-s.
+ * of `struct GNUNET_HashCode`s.
*
* @parem cls closure, a set union operation
* @param mh the demand message
*/
-static void
-handle_p2p_demand (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_demand (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
struct ElementEntry *ee;
num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
/ sizeof (struct GNUNET_HashCode);
- if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
- != num_hashes * sizeof (struct GNUNET_HashCode))
- {
- GNUNET_break_op (0);
- fail_union_operation (op);
- return;
- }
-
for (hash = (const struct GNUNET_HashCode *) &mh[1];
num_hashes > 0;
hash++, num_hashes--)
{
- ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash);
+ ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
+ hash);
if (NULL == ee)
{
/* Demand for non-existing element. */
break;
}
}
+ GNUNET_CADET_receive_done (op->channel);
}
/**
- * Handle offers (of GNUNET_HashCode-s) and
- * respond with demands (of GNUNET_HashCode-s).
+ * Check offer (of `struct GNUNET_HashCode`s).
*
* @param cls the union operation
* @param mh the message
+ * @return #GNUNET_OK if @a mh is well-formed
*/
-static void
-handle_p2p_offer (void *cls,
- const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- const struct GNUNET_HashCode *hash;
unsigned int num_hashes;
+ if (OT_UNION != op->type)
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
/* look up elements and send them */
if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
(op->state->phase != PHASE_INVENTORY_ACTIVE))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
/ sizeof (struct GNUNET_HashCode);
!= num_hashes * sizeof (struct GNUNET_HashCode))
{
GNUNET_break_op (0);
- fail_union_operation (op);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
+/**
+ * Handle offers (of `struct GNUNET_HashCode`s) and
+ * respond with demands (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+void
+handle_union_p2p_offer (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct Operation *op = cls;
+ const struct GNUNET_HashCode *hash;
+ unsigned int num_hashes;
+
+ num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+ / sizeof (struct GNUNET_HashCode);
for (hash = (const struct GNUNET_HashCode *) &mh[1];
num_hashes > 0;
hash++, num_hashes--)
*(struct GNUNET_HashCode *) &demands[1] = *hash;
GNUNET_MQ_send (op->mq, ev);
}
+ GNUNET_CADET_receive_done (op->channel);
}
* @param cls the union operation
* @param mh the message
*/
-static void
-handle_p2p_done (void *cls,
- const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_done (void *cls,
+ const struct GNUNET_MessageHeader *mh)
{
struct Operation *op = cls;
- if (op->state->phase == PHASE_INVENTORY_PASSIVE)
+ if (OT_UNION != op->type)
{
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
+ }
+ switch (op->state->phase)
+ {
+ case PHASE_INVENTORY_PASSIVE:
/* We got all requests, but still have to send our elements in response. */
-
op->state->phase = PHASE_FINISH_WAITING;
LOG (GNUNET_ERROR_TYPE_DEBUG,
* all our demands are satisfied, so that the active
* peer can quit if we gave him everything.
*/
+ GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
return;
- }
- if (op->state->phase == PHASE_INVENTORY_ACTIVE)
- {
+ case PHASE_INVENTORY_ACTIVE:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"got DONE (as active partner), waiting to finish\n");
/* All demands of the other peer are satisfied,
* to the other peer once our demands are met.
*/
op->state->phase = PHASE_FINISH_CLOSING;
+ GNUNET_CADET_receive_done (op->channel);
maybe_finish (op);
return;
+ default:
+ GNUNET_break_op (0);
+ fail_union_operation (op);
+ return;
}
- GNUNET_break_op (0);
- fail_union_operation (op);
}
}
-/**
- * Dispatch messages for a union operation.
- *
- * @param op the state of the union evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- * #GNUNET_OK otherwise
- */
-int
-union_handle_p2p_message (struct Operation *op,
- const struct GNUNET_MessageHeader *mh)
-{
- //LOG (GNUNET_ERROR_TYPE_DEBUG,
- // "received p2p message (t: %u, s: %u)\n",
- // ntohs (mh->type),
- // ntohs (mh->size));
- switch (ntohs (mh->type))
- {
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
- return handle_p2p_ibf (op, mh);
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
- return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
- return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
- case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
- handle_p2p_elements (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
- handle_p2p_full_element (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
- handle_p2p_inquiry (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
- handle_p2p_done (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
- handle_p2p_offer (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
- handle_p2p_demand (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
- handle_p2p_full_done (op, mh);
- break;
- case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
- handle_p2p_request_full (op, mh);
- break;
- default:
- /* Something wrong with cadet's message handlers? */
- GNUNET_assert (0);
- }
- return GNUNET_OK;
-}
-
-
/**
* Handler for peer-disconnects, notifies the client
* about the aborted operation in case the op was not concluded.
{
static const struct SetVT union_vt = {
.create = &union_set_create,
- .msg_handler = &union_handle_p2p_message,
.add = &union_add,
.remove = &union_remove,
.destroy_set = &union_set_destroy,