From: Christian Fuchs Date: Wed, 18 Dec 2013 15:39:21 +0000 (+0000) Subject: - further work on multipart receiving X-Git-Tag: initial-import-from-subversion-38251~5324 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=f9c6eb1e0a52ee619997e8f3772d99457adac3b5;p=oweals%2Fgnunet.git - further work on multipart receiving - removed the multipart-state from the statemachine again, as we can recognize multipart sending based on wether or not the bf_data pointer is null or not - simplified & refactored the multipart message format a bit --- diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 7152eec06..886d4c6dd 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -53,10 +53,6 @@ enum IntersectionOperationPhase * until one notices the their element count is equal */ PHASE_BF_EXCHANGE, - /** - * Multipart continuation of BF_exchange - */ - PHASE_BF_AWAIT_MULTIPART, /** * if both peers have an equal peercount, they enter this state for * one more turn, to see if they actually have agreed on a correct set. @@ -91,12 +87,17 @@ struct OperationState /** * for multipart msgs we have to store the bloomfilter-data until we fully sent it. */ - char * local_bf_data; + char * bf_data; /** * size of the bloomfilter */ - uint32_t local_bf_data_size; + uint32_t bf_data_size; + + /** + * size of the bloomfilter + */ + uint32_t bf_bits_per_element; /** * Current state of the operation. @@ -217,7 +218,6 @@ iterator_initialization (void *cls, { struct ElementEntry *ee = value; struct Operation *op = cls; - struct GNUNET_HashCode mutated_hash; //only consider this element, if it is valid for us if ((op->generation_created >= ee->generation_removed) @@ -366,24 +366,24 @@ send_bloomfilter_multipart (struct Operation *op, uint32_t offset) struct GNUNET_MQ_Envelope *ev; struct BFPart *msg; uint32_t chunk_size = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof(struct BFPart)); - uint32_t todo_size = op->state->local_bf_data_size - offset; + uint32_t todo_size = op->state->bf_data_size - offset; if (todo_size < chunk_size) chunk_size = todo_size; ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART); - msg->bloomfilter_length = htonl (chunk_size); - msg->bloomfilter_offset = htonl (offset); - memcpy(&msg[1], &op->state->local_bf_data[offset], chunk_size); + msg->chunk_length = htonl (chunk_size); + msg->chunk_offset = htonl (offset); + memcpy(&msg[1], &op->state->bf_data[offset], chunk_size); GNUNET_MQ_send (op->mq, ev); - if (op->state->local_bf_data_size == offset + chunk_size) + if (op->state->bf_data_size == offset + chunk_size) { // done - GNUNET_free(op->state->local_bf_data); - op->state->local_bf_data = NULL; + GNUNET_free(op->state->bf_data); + op->state->bf_data = NULL; return; } @@ -431,20 +431,20 @@ send_bloomfilter (struct Operation *op) ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, - &msg[1], + (char*)&msg[1], bf_size)); } else { //multipart chunk_size = GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BFMessage); ev = GNUNET_MQ_msg_extra (msg, chunk_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF); - op->state->local_bf_data = (char *) GNUNET_malloc (bf_size); + op->state->bf_data = (char *) GNUNET_malloc (bf_size); GNUNET_assert (GNUNET_SYSERR != GNUNET_CONTAINER_bloomfilter_get_raw_data (local_bf, - op->state->local_bf_data, + op->state->bf_data, bf_size)); - memcpy (&msg[1], op->state->local_bf_data, chunk_size); - op->state->local_bf_data_size = bf_size; + memcpy (&msg[1], op->state->bf_data, chunk_size); + op->state->bf_data_size = bf_size; } GNUNET_CONTAINER_bloomfilter_free (local_bf); @@ -456,7 +456,7 @@ send_bloomfilter (struct Operation *op) GNUNET_MQ_send (op->mq, ev); - if (op->state->local_bf_data) + if (op->state->bf_data) send_bloomfilter_multipart (op, chunk_size); } @@ -540,50 +540,19 @@ send_peer_done (struct Operation *op) GNUNET_MQ_send (op->mq, ev); } -/** - * Handle an BF multipart message from a remote peer. - * - * @param cls the intersection operation - * @param mh the header of the message - */ -static void -handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh) -{ - struct Operation *op = cls; - const struct BFPart *msg = (const struct BFPart *) mh; - - if (op->state->phase != PHASE_BF_AWAIT_MULTIPART){ - GNUNET_break_op (0); - fail_intersection_operation(op); - return; - } - - -} /** - * Handle an BF message from a remote peer. + * Process a Bloomfilter once we got all the chunks * - * @param cls the intersection operation - * @param mh the header of the message + * @param op the intersection operation */ static void -handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) -{ - struct Operation *op = cls; - const struct BFMessage *msg = (const struct BFMessage *) mh; +process_bf (struct Operation *op){ uint32_t old_elements; uint32_t peer_elements; - + old_elements = op->state->my_element_count; - op->spec->salt = ntohl (msg->sender_mutator); - - op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], - ntohl (msg->bloomfilter_total_length), - ntohl (msg->bits_per_element)); - op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, - BLOOMFILTER_SIZE, - GNUNET_CONSTANTS_BLOOMFILTER_K); + peer_elements = op->spec->remote_element_count; switch (op->state->phase) { case PHASE_INITIAL: @@ -613,25 +582,116 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); op->state->remote_bf = NULL; - peer_elements = ntohl(msg->sender_element_count); if ((op->state->phase == PHASE_MAYBE_FINISHED) && (old_elements == op->state->my_element_count) && (op->state->my_element_count == peer_elements)){ // In the last round we though we were finished, we now know this is correct - send_peer_done(op); + send_peer_done (op); return; } op->state->phase = PHASE_BF_EXCHANGE; - // maybe we are finished, but we do one more round to make certain - // we don't have false positives ... if (op->state->my_element_count == peer_elements) - op->state->phase = PHASE_MAYBE_FINISHED; + // maybe we are finished, but we do one more round to make certain + // we don't have false positives ... + op->state->phase = PHASE_MAYBE_FINISHED; send_bloomfilter (op); } +/** + * Handle an BF multipart message from a remote peer. + * + * @param cls the intersection operation + * @param mh the header of the message + */ +static void +handle_p2p_bf_part (void *cls, const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + const struct BFPart *msg = (const struct BFPart *) mh; + uint32_t chunk_size; + uint32_t chunk_offset; + + chunk_size = ntohl(msg->chunk_length); + chunk_offset = ntohl(msg->chunk_offset); + + if ((NULL == op->state->bf_data) + || (op->state->bf_data_size < chunk_size + chunk_offset)){ + // unexpected multipart chunk + GNUNET_break_op (0); + fail_intersection_operation(op); + return; + } + + memcpy (&op->state->bf_data[chunk_offset], (const char*) &msg[1], chunk_size); + + if (op->state->bf_data_size > chunk_size + chunk_offset) + // wait for next chunk + return; + + op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], + op->state->bf_data_size, + op->state->bf_bits_per_element); + + GNUNET_free (op->state->bf_data); + op->state->bf_data = NULL; + + process_bf (op); +} + + +/** + * Handle an BF message from a remote peer. + * + * @param cls the intersection operation + * @param mh the header of the message + */ +static void +handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) +{ + struct Operation *op = cls; + const struct BFMessage *msg = (const struct BFMessage *) mh; + uint32_t bf_size; + uint32_t chunk_size; + uint32_t bf_bits_per_element; + + switch (op->state->phase) + { + case PHASE_INITIAL: + case PHASE_BF_EXCHANGE: + case PHASE_MAYBE_FINISHED: + if (NULL == op->state->bf_data) { + // no colliding multipart transaction going on currently + op->spec->salt = ntohl (msg->sender_mutator); + bf_size = ntohl (msg->bloomfilter_total_length); + bf_bits_per_element = ntohl (msg->bits_per_element); + chunk_size = ntohl (msg->bloomfilter_length); + op->spec->remote_element_count = ntohl(msg->sender_element_count); + if (bf_size == chunk_size) { + // single part, done here + op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1], + bf_size, + bf_bits_per_element); + process_bf (op); + return; + } + + //first multipart chunk + op->state->bf_data = GNUNET_malloc (bf_size); + op->state->bf_data_size = bf_size; + op->state->bf_bits_per_element = bf_bits_per_element; + memcpy (op->state->bf_data, (const char*) &msg[1], chunk_size); + return; + } + default: + GNUNET_break_op (0); + fail_intersection_operation (op); + } +} + + /** * Handle an BF message from a remote peer. * diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h index 9d39abba8..b48809a3c 100644 --- a/src/set/set_protocol.h +++ b/src/set/set_protocol.h @@ -140,12 +140,12 @@ struct BFPart /** * Length of the appended bloomfilter data block */ - uint32_t bloomfilter_length GNUNET_PACKED; + uint32_t chunk_length GNUNET_PACKED; /** * offset in the bloolfilter data block, if multipart message */ - uint32_t bloomfilter_offset GNUNET_PACKED; + uint32_t chunk_offset GNUNET_PACKED; /** * rest: the sender's bloomfilter