From: Christian Grothoff Date: Sat, 11 Mar 2017 17:15:38 +0000 (+0100) Subject: cleaning up set handlers, eliminating 2nd level demultiplexing and improving use... X-Git-Tag: gnunet-0.11.0rc0~288^2~3 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=abdec5e11ff11bb10d32c013e11344a54786f80f;p=oweals%2Fgnunet.git cleaning up set handlers, eliminating 2nd level demultiplexing and improving use of types --- diff --git a/src/set/Makefile.am b/src/set/Makefile.am index cfe95bc1a..03c258352 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am @@ -51,7 +51,7 @@ gnunet_set_ibf_profiler_LDADD = \ gnunet_service_set_SOURCES = \ gnunet-service-set.c gnunet-service-set.h \ - gnunet-service-set_union.c \ + gnunet-service-set_union.c gnunet-service-set_union.h \ gnunet-service-set_intersection.c \ ibf.c ibf.h \ gnunet-service-set_union_strata_estimator.c gnunet-service-set_union_strata_estimator.h \ diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 454ad9784..8f1506c6a 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -24,6 +24,8 @@ * @author Christian Grothoff */ #include "gnunet-service-set.h" +#include "gnunet-service-set_union.h" +#include "gnunet-service-set_intersection.h" #include "gnunet-service-set_protocol.h" #include "gnunet_statistics_service.h" @@ -476,6 +478,7 @@ _GSS_operation_destroy (struct Operation *op, op->channel = NULL; GNUNET_CADET_channel_destroy (channel); } + if (GNUNET_YES == gc) collect_generation_garbage (set); /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL, @@ -682,7 +685,7 @@ client_disconnect_cb (void *cls, { struct Operation *curr = op; op = op->next; - if ( (GNUNET_YES == curr->is_incoming) && + if ( (GNUNET_YES == curr->is_incoming) && (curr->listener == listener) ) incoming_destroy (curr); } @@ -732,6 +735,38 @@ incoming_suggest (struct Operation *incoming, } +/** + * Check a request for a set operation from another peer. + * + * @param cls the operation state + * @param msg the received message + * @return #GNUNET_OK if the channel should be kept alive, + * #GNUNET_SYSERR to destroy the channel + */ +static int +check_incoming_msg (void *cls, + const struct OperationRequestMessage *msg) +{ + struct Operation *op = cls; + const struct GNUNET_MessageHeader *nested_context; + + /* double operation request */ + if (NULL != op->spec) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + nested_context = GNUNET_MQ_extract_nested_mh (msg); + if ( (NULL != nested_context) && + (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Handle a request for a set operation from another peer. Checks if we * have a listener waiting for such a request (and in that case initiates @@ -744,42 +779,23 @@ incoming_suggest (struct Operation *incoming, * our virtual table and subsequent msgs would be routed differently (as * we then know what type of operation this is). * - * @param op the operation state - * @param mh the received message + * @param cls the operation state + * @param msg the received message * @return #GNUNET_OK if the channel should be kept alive, * #GNUNET_SYSERR to destroy the channel */ -static int -handle_incoming_msg (struct Operation *op, - const struct GNUNET_MessageHeader *mh) +static void +handle_incoming_msg (void *cls, + const struct OperationRequestMessage *msg) { - const struct OperationRequestMessage *msg; + struct Operation *op = cls; struct Listener *listener = op->listener; struct OperationSpecification *spec; const struct GNUNET_MessageHeader *nested_context; - msg = (const struct OperationRequestMessage *) mh; GNUNET_assert (GNUNET_YES == op->is_incoming); - if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - /* double operation request */ - if (NULL != op->spec) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } spec = GNUNET_new (struct OperationSpecification); nested_context = GNUNET_MQ_extract_nested_mh (msg); - if ( (NULL != nested_context) && - (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) - { - GNUNET_break_op (0); - GNUNET_free (spec); - return GNUNET_SYSERR; - } /* Make a copy of the nested_context (application-specific context information that is opaque to set) so we can pass it to the listener later on */ @@ -792,7 +808,6 @@ handle_incoming_msg (struct Operation *op, spec->peer = op->peer; spec->remote_element_count = ntohl (msg->element_count); op->spec = spec; - listener = op->listener; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received P2P operation request (op %u, port %s) for active listener\n", @@ -800,7 +815,6 @@ handle_incoming_msg (struct Operation *op, GNUNET_h2s (&listener->app_id)); incoming_suggest (op, listener); - return GNUNET_OK; } @@ -1103,9 +1117,11 @@ handle_client_create_set (void *cls, { case GNUNET_SET_OPERATION_INTERSECTION: set->vt = _GSS_intersection_vt (); + set->type = OT_INTERSECTION; break; case GNUNET_SET_OPERATION_UNION: set->vt = _GSS_union_vt (); + set->type = OT_UNION; break; default: GNUNET_free (set); @@ -1196,7 +1212,6 @@ channel_new_cb (void *cls, const struct GNUNET_PeerIdentity *source) { static const struct SetVT incoming_vt = { - .msg_handler = &handle_incoming_msg, .peer_disconnect = &handle_incoming_disconnect }; struct Listener *listener = cls; @@ -1290,60 +1305,6 @@ channel_window_cb (void *cls, /* FIXME: not implemented, we could do flow control here... */ } -/** - * FIXME: hack-job. Migrate to proper handler array use! - * - * @param cls local state associated with the channel. - * @param message The actual message. - */ -static int -check_p2p_message (void *cls, - const struct GNUNET_MessageHeader *message) -{ - return GNUNET_OK; -} - - -/** - * FIXME: hack-job. Migrate to proper handler array use! - * - * Functions with this signature are called whenever a message is - * received via a cadet channel. - * - * The msg_handler is a virtual table set in initially either when a peer - * creates a new channel with us, or once we create a new channel - * ourselves (evaluate). - * - * Once we know the exact type of operation (union/intersection), the vt is - * replaced with an operation specific instance (_GSS_[op]_vt). - * - * @param cls local state associated with the channel. - * @param message The actual message. - */ -static void -handle_p2p_message (void *cls, - const struct GNUNET_MessageHeader *message) -{ - struct Operation *op = cls; - int ret; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dispatching cadet message (type: %u)\n", - ntohs (message->type)); - /* do this before the handler, as the handler might kill the channel */ - GNUNET_CADET_receive_done (op->channel); - if (NULL != op->vt) - ret = op->vt->msg_handler (op, - message); - else - ret = GNUNET_SYSERR; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Handled cadet message (type: %u)\n", - ntohs (message->type)); - if (GNUNET_OK != ret) - GNUNET_CADET_channel_destroy (op->channel); -} - /** * Called when a client wants to create a new listener. @@ -1357,66 +1318,66 @@ handle_client_listen (void *cls, { struct GNUNET_SERVICE_Client *client = cls; struct GNUNET_MQ_MessageHandler cadet_handlers[] = { - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (incoming_msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - struct GNUNET_MessageHeader, + struct OperationRequestMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, - struct GNUNET_MessageHeader, + struct IBFMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, - struct GNUNET_MessageHeader, + struct GNUNET_SET_ElementMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_offer, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, struct GNUNET_MessageHeader, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_inquiry, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, - struct GNUNET_MessageHeader, + struct InquiryMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_demand, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, struct GNUNET_MessageHeader, NULL), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_fixed_size (union_p2p_done, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (union_p2p_full_done, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_fixed_size (union_p2p_request_full, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, - struct GNUNET_MessageHeader, + struct StrataEstimatorMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, - struct GNUNET_MessageHeader, + struct StrataEstimatorMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_full_element, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, - struct GNUNET_MessageHeader, + struct GNUNET_SET_ElementMessage, NULL), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, + struct IntersectionElementInfoMessage, + NULL), + GNUNET_MQ_hd_var_size (intersection_p2p_bf, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, - struct GNUNET_MessageHeader, + struct BFMessage, NULL), + GNUNET_MQ_hd_fixed_size (intersection_p2p_done, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, + struct IntersectionDoneMessage, + NULL), GNUNET_MQ_handler_end () }; struct Listener *listener; @@ -1623,66 +1584,66 @@ handle_client_evaluate (void *cls, struct GNUNET_SERVICE_Client *client = cls; struct Operation *op = GNUNET_new (struct Operation); const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (incoming_msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - struct GNUNET_MessageHeader, + struct OperationRequestMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, - struct GNUNET_MessageHeader, + struct IBFMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, - struct GNUNET_MessageHeader, + struct GNUNET_SET_ElementMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_offer, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, struct GNUNET_MessageHeader, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_inquiry, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, - struct GNUNET_MessageHeader, + struct InquiryMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_demand, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, struct GNUNET_MessageHeader, op), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, - struct GNUNET_MessageHeader, - op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_fixed_size (union_p2p_done, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_fixed_size (union_p2p_full_done, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_fixed_size (union_p2p_request_full, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, - struct GNUNET_MessageHeader, + struct StrataEstimatorMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, - struct GNUNET_MessageHeader, - op), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, - struct GNUNET_MessageHeader, - op), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL, - struct GNUNET_MessageHeader, + struct StrataEstimatorMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_var_size (union_p2p_full_element, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, - struct GNUNET_MessageHeader, - op), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, - struct GNUNET_MessageHeader, + struct GNUNET_SET_ElementMessage, op), - GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, + struct IntersectionElementInfoMessage, + op), + GNUNET_MQ_hd_var_size (intersection_p2p_bf, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, - struct GNUNET_MessageHeader, - op), - GNUNET_MQ_hd_var_size (p2p_message, - GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, - struct GNUNET_MessageHeader, + struct BFMessage, op), + GNUNET_MQ_hd_fixed_size (intersection_p2p_done, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, + struct IntersectionDoneMessage, + op), GNUNET_MQ_handler_end () }; struct Set *set; @@ -1717,7 +1678,7 @@ handle_client_evaluate (void *cls, // mutations won't interfer with the running operation. op->generation_created = set->current_generation; advance_generation (set); - + op->type = set->type; op->vt = set->vt; GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, @@ -1886,9 +1847,11 @@ handle_client_copy_lazy_connect (void *cls, { case GNUNET_SET_OPERATION_INTERSECTION: set->vt = _GSS_intersection_vt (); + set->type = OT_INTERSECTION; break; case GNUNET_SET_OPERATION_UNION: set->vt = _GSS_union_vt (); + set->type = OT_UNION; break; default: GNUNET_assert (0); @@ -2057,6 +2020,7 @@ handle_client_accept (void *cls, advance_generation (set); op->vt = set->vt; + op->type = set->type; op->vt->accept (op); GNUNET_SERVICE_client_continue (client); } diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 68d8fe81f..c981430ef 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -212,20 +212,6 @@ typedef void const struct GNUNET_MessageHeader *opaque_context); -/** - * Signature of functions that implement the message handling for - * the different set operations. - * - * @param op operation state - * @param msg received message - * @return #GNUNET_OK on success, #GNUNET_SYSERR to - * destroy the operation and the tunnel - */ -typedef int -(*MsgHandlerImpl) (struct Operation *op, - const struct GNUNET_MessageHeader *msg); - - /** * Signature of functions that implement operation cancellation * @@ -275,11 +261,6 @@ struct SetVT */ DestroySetImpl destroy_set; - /** - * Callback for handling operation-specific messages. - */ - MsgHandlerImpl msg_handler; - /** * Callback for handling the remote peer's disconnect. */ @@ -363,6 +344,27 @@ struct ElementEntry struct Listener; +/** + * Possible set operations. + */ +enum OperationType { + /** + * Operation type unknown. + */ + OT_UNKNOWN = 0, + + /** + * We are performing a union. + */ + OT_UNION, + + /** + * We are performing an intersection. + */ + OT_INTERSECTION +}; + + /** * Operation context used to execute a set operation. */ @@ -426,6 +428,11 @@ struct Operation */ struct GNUNET_SCHEDULER_Task *timeout_task; + /** + * What type of operation is this? + */ + enum OperationType type; + /** * Unique request id for the request from a remote peer, sent to the * client, which will accept or reject the request. Set to '0' iff @@ -581,6 +588,11 @@ struct Set */ struct Operation *ops_tail; + /** + * What type of operation is this set for? + */ + enum OperationType type; + /** * Current generation, that is, number of previously executed * operations and lazy copies on the underlying set content. diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 9fe1eabe6..b298f7b41 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013, 2014 GNUnet e.V. + Copyright (C) 2013-2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -28,6 +28,7 @@ #include "gnunet-service-set.h" #include "gnunet_block_lib.h" #include "gnunet-service-set_protocol.h" +#include "gnunet-service-set_intersection.h" #include @@ -550,6 +551,8 @@ send_remaining_elements (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending done and destroy because iterator ran out\n"); op->keep--; + GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); + op->state->full_result_iter = NULL; send_client_done_and_destroy (op); return; } @@ -627,9 +630,6 @@ process_bf (struct Operation *op) case PHASE_COUNT_SENT: /* This is the first BF being sent, build our initial map with filtering in place */ - op->state->my_elements - = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count, - GNUNET_YES); op->state->my_element_count = 0; GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, &filtered_map_initialization, @@ -664,42 +664,54 @@ process_bf (struct Operation *op) } +/** + * Check an BF message from a remote peer. + * + * @param cls the intersection operation + * @param msg the header of the message + * @return #GNUNET_OK if @a msg is well-formed + */ +int +check_intersection_p2p_bf (void *cls, + const struct BFMessage *msg) +{ + struct Operation *op = cls; + + if (OT_INTERSECTION != op->type) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + /** * Handle an BF message from a remote peer. * * @param cls the intersection operation - * @param mh the header of the message + * @param msg the header of the message */ -static void -handle_p2p_bf (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_intersection_p2p_bf (void *cls, + const struct BFMessage *msg) { struct Operation *op = cls; - const struct BFMessage *msg; uint32_t bf_size; uint32_t chunk_size; uint32_t bf_bits_per_element; - uint16_t msize; - msize = htons (mh->size); - if (msize < sizeof (struct BFMessage)) - { - GNUNET_break_op (0); - fail_intersection_operation (op); - return; - } - msg = (const struct BFMessage *) mh; switch (op->state->phase) { case PHASE_INITIAL: GNUNET_break_op (0); fail_intersection_operation (op); - break; + return; case PHASE_COUNT_SENT: case PHASE_BF_EXCHANGE: bf_size = ntohl (msg->bloomfilter_total_length); bf_bits_per_element = ntohl (msg->bits_per_element); - chunk_size = msize - sizeof (struct BFMessage); + chunk_size = htons (msg->header.size) - sizeof (struct BFMessage); op->state->other_xor = msg->element_xor_hash; if (bf_size == chunk_size) { @@ -717,7 +729,7 @@ handle_p2p_bf (void *cls, op->state->salt = ntohl (msg->sender_mutator); op->spec->remote_element_count = ntohl (msg->sender_element_count); process_bf (op); - return; + break; } /* multipart chunk */ if (NULL == op->state->bf_data) @@ -764,8 +776,9 @@ handle_p2p_bf (void *cls, default: GNUNET_break_op (0); fail_intersection_operation (op); - break; + return; } + GNUNET_CADET_receive_done (op->channel); } @@ -836,6 +849,7 @@ static void begin_bf_exchange (struct Operation *op) { op->state->phase = PHASE_BF_EXCHANGE; + GNUNET_assert (NULL == op->state->my_elements); op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES); @@ -853,20 +867,18 @@ begin_bf_exchange (struct Operation *op) * @param cls the intersection operation * @param mh the header of the message */ -static void -handle_p2p_element_info (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_intersection_p2p_element_info (void *cls, + const struct IntersectionElementInfoMessage *msg) { struct Operation *op = cls; - const struct IntersectionElementInfoMessage *msg; - if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage)) + if (OT_INTERSECTION != op->type) { GNUNET_break_op (0); fail_intersection_operation(op); return; } - msg = (const struct IntersectionElementInfoMessage *) mh; op->spec->remote_element_count = ntohl (msg->sender_element_count); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received remote element count (%u), I have %u\n", @@ -884,6 +896,7 @@ handle_p2p_element_info (void *cls, } GNUNET_break (NULL == op->state->remote_bf); begin_bf_exchange (op); + GNUNET_CADET_receive_done (op->channel); } @@ -955,28 +968,26 @@ filter_all (void *cls, * @param cls the intersection operation * @param mh the message */ -static void -handle_p2p_done (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_intersection_p2p_done (void *cls, + const struct IntersectionDoneMessage *idm) { struct Operation *op = cls; - const struct IntersectionDoneMessage *idm; - if (PHASE_BF_EXCHANGE != op->state->phase) + if (OT_INTERSECTION != op->type) { - /* wrong phase to conclude? FIXME: Or should we allow this - if the other peer has _initially_ already an empty set? */ GNUNET_break_op (0); - fail_intersection_operation (op); + fail_intersection_operation(op); return; } - if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage)) + if (PHASE_BF_EXCHANGE != op->state->phase) { + /* wrong phase to conclude? FIXME: Or should we allow this + if the other peer has _initially_ already an empty set? */ GNUNET_break_op (0); fail_intersection_operation (op); return; } - idm = (const struct IntersectionDoneMessage *) mh; if (0 == ntohl (idm->final_element_count)) { /* other peer determined empty set is the intersection, @@ -1000,6 +1011,7 @@ handle_p2p_done (void *cls, op->state->my_element_count); op->state->phase = PHASE_FINISHED; finish_and_destroy (op); + GNUNET_CADET_receive_done (op->channel); } @@ -1064,11 +1076,11 @@ intersection_accept (struct Operation *op) op->state->phase = PHASE_INITIAL; op->state->my_element_count = op->spec->set->state->current_set_element_count; + GNUNET_assert (NULL == op->state->my_elements); op->state->my_elements - = GNUNET_CONTAINER_multihashmap_create - (GNUNET_MIN (op->state->my_element_count, - op->spec->remote_element_count), - GNUNET_YES); + = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN (op->state->my_element_count, + op->spec->remote_element_count), + GNUNET_YES); if (op->spec->remote_element_count < op->state->my_element_count) { /* If the other peer (Alice) has fewer elements than us (Bob), @@ -1082,43 +1094,6 @@ intersection_accept (struct Operation *op) } -/** - * Dispatch messages for a intersection operation. - * - * @param op the state of the intersection evaluate operation - * @param mh the received message - * @return #GNUNET_SYSERR if the tunnel should be disconnected, - * #GNUNET_OK otherwise - */ -static int -intersection_handle_p2p_message (struct Operation *op, - const struct GNUNET_MessageHeader *mh) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received p2p message (t: %u, s: %u)\n", - ntohs (mh->type), ntohs (mh->size)); - switch (ntohs (mh->type)) - { - /* this message handler is not active until after we received an - * operation request message, thus the ops request is not handled here - */ - case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO: - handle_p2p_element_info (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: - handle_p2p_bf (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE: - handle_p2p_done (op, mh); - break; - default: - /* something wrong with cadet's message handlers? */ - GNUNET_assert (0); - } - return GNUNET_OK; -} - - /** * Handler for peer-disconnects, notifies the client about the aborted * operation. If we did not expect anything from the other peer, we @@ -1168,6 +1143,11 @@ intersection_op_cancel (struct Operation *op) GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements); op->state->my_elements = NULL; } + if (NULL != op->state->full_result_iter) + { + GNUNET_CONTAINER_multihashmap_iterator_destroy (op->state->full_result_iter); + op->state->full_result_iter = NULL; + } GNUNET_free (op->state); op->state = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1245,7 +1225,6 @@ _GSS_intersection_vt () { static const struct SetVT intersection_vt = { .create = &intersection_set_create, - .msg_handler = &intersection_handle_p2p_message, .add = &intersection_add, .remove = &intersection_remove, .destroy_set = &intersection_set_destroy, diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index b5b602074..200bd4b8e 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2013-2016 GNUnet e.V. + Copyright (C) 2013-2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -19,15 +19,16 @@ */ /** * @file set/gnunet-service-set_union.c - * @brief two-peer set operations * @author Florian Dold + * @author Christian Grothoff */ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_statistics_service.h" #include "gnunet-service-set.h" #include "ibf.h" +#include "gnunet-service-set_union.h" #include "gnunet-service-set_union_strata_estimator.h" #include "gnunet-service-set_protocol.h" #include @@ -813,42 +814,56 @@ send_full_set (struct Operation *op) * Handle a strata estimator from a remote peer * * @param cls the union operation - * @param mh the message - * @param is_compressed #GNUNET_YES if the estimator is compressed - * @return #GNUNET_SYSERR if the tunnel should be disconnected, - * #GNUNET_OK otherwise + * @param msg the message */ -static int -handle_p2p_strata_estimator (void *cls, - const struct GNUNET_MessageHeader *mh, - int is_compressed) +int +check_union_p2p_strata_estimator (void *cls, + const struct StrataEstimatorMessage *msg) { struct Operation *op = cls; - struct StrataEstimator *remote_se; - struct StrataEstimatorMessage *msg = (void *) mh; - unsigned int diff; - uint64_t other_size; + int is_compressed; size_t len; - GNUNET_STATISTICS_update (_GSS_statistics, - "# bytes of SE received", - ntohs (mh->size), - GNUNET_NO); - if (op->state->phase != PHASE_EXPECT_SE) { GNUNET_break (0); - fail_union_operation (op); return GNUNET_SYSERR; } - len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage); + is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); + len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); if ( (GNUNET_NO == is_compressed) && (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) { - fail_union_operation (op); GNUNET_break (0); return GNUNET_SYSERR; } + return GNUNET_OK; +} + + +/** + * Handle a strata estimator from a remote peer + * + * @param cls the union operation + * @param msg the message + */ +void +handle_union_p2p_strata_estimator (void *cls, + const struct StrataEstimatorMessage *msg) +{ + struct Operation *op = cls; + struct StrataEstimator *remote_se; + unsigned int diff; + uint64_t other_size; + size_t len; + int is_compressed; + + is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (msg->header.type)); + GNUNET_STATISTICS_update (_GSS_statistics, + "# bytes of SE received", + ntohs (msg->header.size), + GNUNET_NO); + len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage); other_size = GNUNET_ntohll (msg->set_size); remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, @@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls, { /* insufficient resources, fail */ fail_union_operation (op); - return GNUNET_SYSERR; + return; } if (GNUNET_OK != strata_estimator_read (&msg[1], @@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls, remote_se)) { /* decompression failed */ - fail_union_operation (op); strata_estimator_destroy (remote_se); - return GNUNET_SYSERR; + fail_union_operation (op); + return; } GNUNET_assert (NULL != op->state->se); diff = strata_estimator_difference (remote_se, op->state->se); if (diff > 200) - diff = diff * 3 / 2; - - + diff = diff * 3 / 2; strata_estimator_destroy (remote_se); strata_estimator_destroy (op->state->se); @@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", diff, - 1<spec->byzantine) && (other_size < op->spec->byzantine_lower_bound)) + if ( (GNUNET_YES == op->spec->byzantine) && + (other_size < op->spec->byzantine_lower_bound) ) { GNUNET_break (0); fail_union_operation (op); - return GNUNET_SYSERR; + return; } - - if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4)) + if ( (GNUNET_YES == op->spec->force_full) || + (diff > op->state->initial_size / 4)) { LOG (GNUNET_ERROR_TYPE_INFO, "Sending full set (diff=%d, own set=%u)\n", @@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls, else { struct GNUNET_MQ_Envelope *ev; + op->state->phase = PHASE_EXPECT_IBF; ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL); GNUNET_MQ_send (op->mq, ev); @@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to send IBF, closing connection\n"); fail_union_operation (op); - return GNUNET_SYSERR; + return; } } - - return GNUNET_OK; + GNUNET_CADET_receive_done (op->channel); } @@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op) /** - * Handle an IBF message from a remote peer. + * Check an IBF message from a remote peer. * * Reassemble the IBF from multiple pieces, and * process the whole IBF once possible. * * @param cls the union operation - * @param mh the header of the message - * @return #GNUNET_SYSERR if the tunnel should be disconnected, - * #GNUNET_OK otherwise + * @param msg the header of the message + * @return #GNUNET_OK if @a msg is well-formed */ -static int -handle_p2p_ibf (void *cls, - const struct GNUNET_MessageHeader *mh) +int +check_union_p2p_ibf (void *cls, + const struct IBFMessage *msg) { struct Operation *op = cls; - const struct IBFMessage *msg; unsigned int buckets_in_message; - if (ntohs (mh->size) < sizeof (struct IBFMessage)) + if (OT_UNION != op->type) { GNUNET_break_op (0); - fail_union_operation (op); return GNUNET_SYSERR; } - msg = (const struct IBFMessage *) mh; - if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || - (op->state->phase == PHASE_EXPECT_IBF) ) + buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; + if (0 == buckets_in_message) { - op->state->phase = PHASE_EXPECT_IBF_CONT; - GNUNET_assert (NULL == op->state->remote_ibf); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Creating new ibf of size %u\n", - 1 << msg->order); - op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); - op->state->salt_receive = ntohl (msg->salt); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", op->state->salt_receive); - if (NULL == op->state->remote_ibf) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to parse remote IBF, closing connection\n"); - fail_union_operation (op); - return GNUNET_SYSERR; - } - op->state->ibf_buckets_received = 0; - if (0 != ntohl (msg->offset)) - { - GNUNET_break_op (0); - fail_union_operation (op); - return GNUNET_SYSERR; - } + GNUNET_break_op (0); + return GNUNET_SYSERR; } - else if (op->state->phase == PHASE_EXPECT_IBF_CONT) + if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if (op->state->phase == PHASE_EXPECT_IBF_CONT) { if (ntohl (msg->offset) != op->state->ibf_buckets_received) { GNUNET_break_op (0); - fail_union_operation (op); return GNUNET_SYSERR; } if (1<order != op->state->remote_ibf->size) { GNUNET_break_op (0); - fail_union_operation (op); return GNUNET_SYSERR; } if (ntohl (msg->salt) != op->state->salt_receive) { GNUNET_break_op (0); - fail_union_operation (op); return GNUNET_SYSERR; } } - else + else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && + (op->state->phase != PHASE_EXPECT_IBF) ) { - GNUNET_assert (0); + GNUNET_break_op (0); + return GNUNET_SYSERR; } - buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; + return GNUNET_OK; +} - if (0 == buckets_in_message) + +/** + * Handle an IBF message from a remote peer. + * + * Reassemble the IBF from multiple pieces, and + * process the whole IBF once possible. + * + * @param cls the union operation + * @param msg the header of the message + */ +void +handle_union_p2p_ibf (void *cls, + const struct IBFMessage *msg) +{ + struct Operation *op = cls; + unsigned int buckets_in_message; + + buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; + if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) || + (op->state->phase == PHASE_EXPECT_IBF) ) { - GNUNET_break_op (0); - fail_union_operation (op); - return GNUNET_SYSERR; + op->state->phase = PHASE_EXPECT_IBF_CONT; + GNUNET_assert (NULL == op->state->remote_ibf); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating new ibf of size %u\n", + 1 << msg->order); + op->state->remote_ibf = ibf_create (1<order, SE_IBF_HASH_NUM); + op->state->salt_receive = ntohl (msg->salt); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving new IBF with salt %u\n", + op->state->salt_receive); + if (NULL == op->state->remote_ibf) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to parse remote IBF, closing connection\n"); + fail_union_operation (op); + return; + } + op->state->ibf_buckets_received = 0; + if (0 != ntohl (msg->offset)) + { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } } - - if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) + else { - GNUNET_break_op (0); - fail_union_operation (op); - return GNUNET_SYSERR; + GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); } - GNUNET_assert (NULL != op->state->remote_ibf); ibf_read_slice (&msg[1], @@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls, /* Internal error, best we can do is shut down */ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to decode IBF, closing connection\n"); - return GNUNET_SYSERR; + fail_union_operation (op); + return; } } - return GNUNET_OK; + GNUNET_CADET_receive_done (op->channel); } @@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls) } +/** + * Tests if the operation is finished, and if so notify. + * + * @param op operation to check + */ static void maybe_finish (struct Operation *op) { @@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op) /** - * Handle an element message from a remote peer. - * Sent by the other peer either because we decoded an IBF and placed a demand, - * or because the other peer switched to full set transmission. + * Check an element message from a remote peer. * * @param cls the union operation - * @param mh the message + * @param emsg the message */ -static void -handle_p2p_elements (void *cls, - const struct GNUNET_MessageHeader *mh) +int +check_union_p2p_elements (void *cls, + const struct GNUNET_SET_ElementMessage *emsg) { struct Operation *op = cls; - struct ElementEntry *ee; - const struct GNUNET_SET_ElementMessage *emsg; - uint16_t element_size; - if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) + if (OT_UNION != op->type) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } - if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes)) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } + return GNUNET_OK; +} + - emsg = (const struct GNUNET_SET_ElementMessage *) mh; +/** + * Handle an element message from a remote peer. + * Sent by the other peer either because we decoded an IBF and placed a demand, + * or because the other peer switched to full set transmission. + * + * @param cls the union operation + * @param emsg the message + */ +void +handle_union_p2p_elements (void *cls, + const struct GNUNET_SET_ElementMessage *emsg) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct KeyEntry *ke; + uint16_t element_size; - element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); + element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); - GNUNET_memcpy (&ee[1], &emsg[1], element_size); + GNUNET_memcpy (&ee[1], + &emsg[1], + element_size); ee->element.size = element_size; ee->element.data = &ee[1]; ee->element.element_type = ntohs (emsg->element_type); ee->remote = GNUNET_YES; - GNUNET_SET_element_hash (&ee->element, &ee->element_hash); - + GNUNET_SET_element_hash (&ee->element, + &ee->element_hash); if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, @@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls, { /* We got something we didn't demand, since it's not in our map. */ GNUNET_break_op (0); - GNUNET_free (ee); fail_union_operation (op); return; } @@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls, 1, GNUNET_NO); - op->state->received_total += 1; - - struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + op->state->received_total++; + ke = op_get_element (op, &ee->element_hash); if (NULL != ke) { /* Got repeated element. Should not happen since @@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls, { LOG (GNUNET_ERROR_TYPE_DEBUG, "Registering new element from remote peer\n"); - op->state->received_fresh += 1; + op->state->received_fresh++; op_register_element (op, ee, GNUNET_YES); /* only send results immediately if the client wants it */ switch (op->spec->result_mode) @@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls, } } - if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) + if ( (op->state->received_total > 8) && + (op->state->received_fresh < op->state->received_total / 3) ) { /* The other peer gave us lots of old elements, there's something wrong. */ GNUNET_break_op (0); fail_union_operation (op); return; } - + GNUNET_CADET_receive_done (op->channel); maybe_finish (op); } /** - * Handle an element message from a remote peer. + * Check a full element message from a remote peer. * * @param cls the union operation - * @param mh the message + * @param emsg the message */ -static void -handle_p2p_full_element (void *cls, - const struct GNUNET_MessageHeader *mh) +int +check_union_p2p_full_element (void *cls, + const struct GNUNET_SET_ElementMessage *emsg) { struct Operation *op = cls; - struct ElementEntry *ee; - const struct GNUNET_SET_ElementMessage *emsg; - uint16_t element_size; - if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) + if (OT_UNION != op->type) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } + // FIXME: check that we expect full elements here? + return GNUNET_OK; +} - emsg = (const struct GNUNET_SET_ElementMessage *) mh; - element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); +/** + * Handle an element message from a remote peer. + * + * @param cls the union operation + * @param emsg the message + */ +void +handle_union_p2p_full_element (void *cls, + const struct GNUNET_SET_ElementMessage *emsg) +{ + struct Operation *op = cls; + struct ElementEntry *ee; + struct KeyEntry *ke; + uint16_t element_size; + + element_size = ntohs (emsg->header.size) - sizeof (struct GNUNET_SET_ElementMessage); ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); GNUNET_memcpy (&ee[1], &emsg[1], element_size); ee->element.size = element_size; @@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls, 1, GNUNET_NO); - op->state->received_total += 1; - - struct KeyEntry *ke = op_get_element (op, &ee->element_hash); + op->state->received_total++; + ke = op_get_element (op, &ee->element_hash); if (NULL != ke) { /* Got repeated element. Should not happen since @@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls, { LOG (GNUNET_ERROR_TYPE_DEBUG, "Registering new element from remote peer\n"); - op->state->received_fresh += 1; + op->state->received_fresh++; op_register_element (op, ee, GNUNET_YES); /* only send results immediately if the client wants it */ switch (op->spec->result_mode) @@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls, } } - if ( (GNUNET_YES == op->spec->byzantine) && - (op->state->received_total > 384 + op->state->received_fresh * 4) && + if ( (GNUNET_YES == op->spec->byzantine) && + (op->state->received_total > 384 + op->state->received_fresh * 4) && (op->state->received_fresh < op->state->received_total / 6) ) { /* The other peer gave us lots of old elements, there's something wrong. */ @@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls, fail_union_operation (op); return; } + GNUNET_CADET_receive_done (op->channel); } + /** * Send offers (for GNUNET_Hash-es) in response * to inquiries (for IBF_Key-s). * * @param cls the union operation - * @param mh the message + * @param msg the message */ -static void -handle_p2p_inquiry (void *cls, - const struct GNUNET_MessageHeader *mh) +int +check_union_p2p_inquiry (void *cls, + const struct InquiryMessage *msg) { struct Operation *op = cls; - const struct IBF_Key *ibf_key; unsigned int num_keys; - struct InquiryMessage *msg; - /* look up elements and send them */ + if (OT_UNION != op->type) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } if (op->state->phase != PHASE_INVENTORY_PASSIVE) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } - num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage)) - / sizeof (struct IBF_Key); - if ((ntohs (mh->size) - sizeof (struct InquiryMessage)) + num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) + / sizeof (struct IBF_Key); + if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage)) != num_keys * sizeof (struct IBF_Key)) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } + return GNUNET_OK; +} - msg = (struct InquiryMessage *) mh; +/** + * Send offers (for GNUNET_Hash-es) in response + * to inquiries (for IBF_Key-s). + * + * @param cls the union operation + * @param msg the message + */ +void +handle_union_p2p_inquiry (void *cls, + const struct InquiryMessage *msg) +{ + struct Operation *op = cls; + const struct IBF_Key *ibf_key; + unsigned int num_keys; + + num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) + / sizeof (struct IBF_Key); ibf_key = (const struct IBF_Key *) &msg[1]; while (0 != num_keys--) { struct IBF_Key unsalted_key; + unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); send_offers_for_key (op, unsalted_key); ibf_key++; } + GNUNET_CADET_receive_done (op->channel); } @@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls, /** - * Handle a + * Handle a request for full set transmission. * * @parem cls closure, a set union operation * @param mh the demand message */ -static void -handle_p2p_request_full (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_union_p2p_request_full (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - if (PHASE_EXPECT_IBF != op->state->phase) + if (OT_UNION != op->type) { + GNUNET_break_op (0); fail_union_operation (op); + return; + } + if (PHASE_EXPECT_IBF != op->state->phase) + { GNUNET_break_op (0); + fail_union_operation (op); return; } // FIXME: we need to check that our set is larger than the // byzantine_lower_bound by some threshold send_full_set (op); + GNUNET_CADET_receive_done (op->channel); } @@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls, * @parem cls closure, a set union operation * @param mh the demand message */ -static void -handle_p2p_full_done (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_union_p2p_full_done (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - if (PHASE_EXPECT_IBF == op->state->phase) + switch (op->state->phase) { - struct GNUNET_MQ_Envelope *ev; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); + case PHASE_EXPECT_IBF: + { + struct GNUNET_MQ_Envelope *ev; - /* send all the elements that did not come from the remote peer */ - GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, - &send_missing_elements_iter, - op); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got FULL DONE, sending elements that other peer is missing\n"); - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); - GNUNET_MQ_send (op->mq, ev); - op->state->phase = PHASE_DONE; + /* send all the elements that did not come from the remote peer */ + GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, + &send_missing_elements_iter, + op); - /* we now wait until the other peer shuts the tunnel down*/ + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); + GNUNET_MQ_send (op->mq, ev); + op->state->phase = PHASE_DONE; + /* we now wait until the other peer shuts the tunnel down*/ + } + break; + case PHASE_FULL_SENDING: + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "got FULL DONE, finishing\n"); + /* We sent the full set, and got the response for that. We're done. */ + op->state->phase = PHASE_DONE; + GNUNET_CADET_receive_done (op->channel); + send_done_and_destroy (op); + return; + } + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Handle full done phase is %u\n", + (unsigned) op->state->phase); + GNUNET_break_op (0); + fail_union_operation (op); + return; } - else if (PHASE_FULL_SENDING == op->state->phase) + GNUNET_CADET_receive_done (op->channel); +} + + +/** + * Check a demand by the other peer for elements based on a list + * of `struct GNUNET_HashCode`s. + * + * @parem cls closure, a set union operation + * @param mh the demand message + * @return #GNUNET_OK if @a mh is well-formed + */ +int +check_union_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + unsigned int num_hashes; + + if (OT_UNION != op->type) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); - /* We sent the full set, and got the response for that. We're done. */ - op->state->phase = PHASE_DONE; - send_done_and_destroy (op); + GNUNET_break_op (0); + return GNUNET_SYSERR; } - else + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); + if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + != num_hashes * sizeof (struct GNUNET_HashCode)) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } + return GNUNET_OK; } /** * Handle a demand by the other peer for elements based on a list - * of GNUNET_HashCode-s. + * of `struct GNUNET_HashCode`s. * * @parem cls closure, a set union operation * @param mh the demand message */ -static void -handle_p2p_demand (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_union_p2p_demand (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; struct ElementEntry *ee; @@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls, num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) / sizeof (struct GNUNET_HashCode); - if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) - != num_hashes * sizeof (struct GNUNET_HashCode)) - { - GNUNET_break_op (0); - fail_union_operation (op); - return; - } - for (hash = (const struct GNUNET_HashCode *) &mh[1]; num_hashes > 0; hash++, num_hashes--) { - ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); + ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, + hash); if (NULL == ee) { /* Demand for non-existing element. */ @@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls, break; } } + GNUNET_CADET_receive_done (op->channel); } /** - * Handle offers (of GNUNET_HashCode-s) and - * respond with demands (of GNUNET_HashCode-s). + * Check offer (of `struct GNUNET_HashCode`s). * * @param cls the union operation * @param mh the message + * @return #GNUNET_OK if @a mh is well-formed */ -static void -handle_p2p_offer (void *cls, - const struct GNUNET_MessageHeader *mh) +int +check_union_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - const struct GNUNET_HashCode *hash; unsigned int num_hashes; + if (OT_UNION != op->type) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } /* look up elements and send them */ if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) && (op->state->phase != PHASE_INVENTORY_ACTIVE)) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) / sizeof (struct GNUNET_HashCode); @@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls, != num_hashes * sizeof (struct GNUNET_HashCode)) { GNUNET_break_op (0); - fail_union_operation (op); - return; + return GNUNET_SYSERR; } + return GNUNET_OK; +} + +/** + * Handle offers (of `struct GNUNET_HashCode`s) and + * respond with demands (of `struct GNUNET_HashCode`s). + * + * @param cls the union operation + * @param mh the message + */ +void +handle_union_p2p_offer (void *cls, + const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + const struct GNUNET_HashCode *hash; + unsigned int num_hashes; + + num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) + / sizeof (struct GNUNET_HashCode); for (hash = (const struct GNUNET_HashCode *) &mh[1]; num_hashes > 0; hash++, num_hashes--) @@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls, *(struct GNUNET_HashCode *) &demands[1] = *hash; GNUNET_MQ_send (op->mq, ev); } + GNUNET_CADET_receive_done (op->channel); } @@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls, * @param cls the union operation * @param mh the message */ -static void -handle_p2p_done (void *cls, - const struct GNUNET_MessageHeader *mh) +void +handle_union_p2p_done (void *cls, + const struct GNUNET_MessageHeader *mh) { struct Operation *op = cls; - if (op->state->phase == PHASE_INVENTORY_PASSIVE) + if (OT_UNION != op->type) { + GNUNET_break_op (0); + fail_union_operation (op); + return; + } + switch (op->state->phase) + { + case PHASE_INVENTORY_PASSIVE: /* We got all requests, but still have to send our elements in response. */ - op->state->phase = PHASE_FINISH_WAITING; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls, * all our demands are satisfied, so that the active * peer can quit if we gave him everything. */ + GNUNET_CADET_receive_done (op->channel); maybe_finish (op); return; - } - if (op->state->phase == PHASE_INVENTORY_ACTIVE) - { + case PHASE_INVENTORY_ACTIVE: LOG (GNUNET_ERROR_TYPE_DEBUG, "got DONE (as active partner), waiting to finish\n"); /* All demands of the other peer are satisfied, @@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls, * to the other peer once our demands are met. */ op->state->phase = PHASE_FINISH_CLOSING; + GNUNET_CADET_receive_done (op->channel); maybe_finish (op); return; + default: + GNUNET_break_op (0); + fail_union_operation (op); + return; } - GNUNET_break_op (0); - fail_union_operation (op); } @@ -2118,62 +2276,6 @@ union_set_destroy (struct SetState *set_state) } -/** - * Dispatch messages for a union operation. - * - * @param op the state of the union evaluate operation - * @param mh the received message - * @return #GNUNET_SYSERR if the tunnel should be disconnected, - * #GNUNET_OK otherwise - */ -int -union_handle_p2p_message (struct Operation *op, - const struct GNUNET_MessageHeader *mh) -{ - //LOG (GNUNET_ERROR_TYPE_DEBUG, - // "received p2p message (t: %u, s: %u)\n", - // ntohs (mh->type), - // ntohs (mh->size)); - switch (ntohs (mh->type)) - { - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: - return handle_p2p_ibf (op, mh); - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: - return handle_p2p_strata_estimator (op, mh, GNUNET_NO); - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC: - return handle_p2p_strata_estimator (op, mh, GNUNET_YES); - case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: - handle_p2p_elements (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: - handle_p2p_full_element (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: - handle_p2p_inquiry (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE: - handle_p2p_done (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER: - handle_p2p_offer (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: - handle_p2p_demand (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: - handle_p2p_full_done (op, mh); - break; - case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL: - handle_p2p_request_full (op, mh); - break; - default: - /* Something wrong with cadet's message handlers? */ - GNUNET_assert (0); - } - return GNUNET_OK; -} - - /** * Handler for peer-disconnects, notifies the client * about the aborted operation in case the op was not concluded. @@ -2240,7 +2342,6 @@ _GSS_union_vt () { static const struct SetVT union_vt = { .create = &union_set_create, - .msg_handler = &union_handle_p2p_message, .add = &union_add, .remove = &union_remove, .destroy_set = &union_set_destroy, diff --git a/src/set/set_api.c b/src/set/set_api.c index 04a4e4910..bc428f9f6 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c @@ -76,6 +76,8 @@ struct GNUNET_SET_Handle /** * Should the set be destroyed once all operations are gone? + * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag, + * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag. */ int destroy_requested; @@ -345,11 +347,13 @@ handle_iter_done (void *cls, if (NULL == iter) return; + set->destroy_requested = GNUNET_SYSERR; set->iterator = NULL; set->iteration_id++; iter (set->iterator_cls, NULL); - + if (GNUNET_SYSERR == set->destroy_requested) + set->destroy_requested = GNUNET_NO; if (GNUNET_YES == set->destroy_requested) GNUNET_SET_destroy (set); } @@ -736,7 +740,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) /* destroying set while iterator is active is currently not supported; we should expand the API to allow clients to explicitly cancel the iteration! */ - if ( (NULL != set->ops_head) || (NULL != set->iterator) ) + if ( (NULL != set->ops_head) || + (NULL != set->iterator) || + (GNUNET_SYSERR == set->destroy_requested) ) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Set operations are pending, delaying set destruction\n"); @@ -809,7 +815,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, msg->force_delta = GNUNET_YES; break; default: - LOG (GNUNET_ERROR_TYPE_ERROR, + LOG (GNUNET_ERROR_TYPE_ERROR, "Option with type %d not recognized\n", (int) opt->type); } } diff --git a/src/set/test_set_union_copy.c b/src/set/test_set_union_copy.c index c887a8958..a1eba6311 100644 --- a/src/set/test_set_union_copy.c +++ b/src/set/test_set_union_copy.c @@ -122,6 +122,7 @@ check_count_iter (void *cls, return GNUNET_NO; } ci_cls->cont (ci_cls->cont_cls); + GNUNET_free (ci_cls); return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,