From c8a23b6fce4dfadda4485492f10a4accc954b111 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 5 Oct 2010 14:45:38 +0000 Subject: [PATCH] mess --- src/dht/gnunet-service-dht.c | 179 ++++++++++++++++++++--------- src/dht/test_dht_api.c | 2 +- src/dht/test_dht_api_peer1.conf | 1 + src/dht/test_dht_multipeer.c | 2 +- src/dht/test_dht_twopeer_put_get.c | 2 +- 5 files changed, 126 insertions(+), 60 deletions(-) diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 577519268..9431ad20a 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -159,10 +159,6 @@ */ #define DEFAULT_MALICIOUS_PUT_FREQUENCY 1000 /* Default is in milliseconds */ -/** - * Type for a malicious request, so we can ignore it during testing - */ -#define DHT_MALICIOUS_MESSAGE_TYPE 42 #define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1) @@ -418,6 +414,16 @@ struct DHT_MessageContext */ struct GNUNET_CONTAINER_BloomFilter *bloom; + /** + * extended query (see gnunet_block_lib.h). + */ + const void *xquery; + + /** + * Bloomfilter to filter out duplicate replies. + */ + struct GNUNET_CONTAINER_BloomFilter *reply_bf; + /** * The key this request was about */ @@ -433,6 +439,16 @@ struct DHT_MessageContext */ uint64_t unique_id; + /** + * Number of bytes in xquery. + */ + size_t xquery_size; + + /** + * Mutator value for the reply_bf, see gnunet_block_lib.h + */ + uint32_t reply_bf_mutator; + /** * Desired replication level */ @@ -460,6 +476,11 @@ struct DHT_MessageContext */ unsigned int importance; + /** + * Should we (still) forward the request on to other peers? + */ + int do_forward; + /** * Did we forward this message? (may need to remember it!) */ @@ -2119,36 +2140,67 @@ static int datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp, const GNUNET_HashCode * key, - uint32_t size, const char *data, uint32_t type) + uint32_t size, const char *data, + enum GNUNET_BLOCK_Type type) { struct DHT_MessageContext *msg_ctx = cls; struct DHT_MessageContext *new_msg_ctx; struct GNUNET_DHT_GetResultMessage *get_result; + enum GNUNET_BLOCK_EvaluationResult eval; + #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET"); -#endif - new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext)); - memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext)); - get_result = - GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size); - get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); - get_result->header.size = - htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); - get_result->expiration = GNUNET_TIME_absolute_hton(exp); - get_result->type = htons (type); - memcpy (&get_result[1], data, size); - new_msg_ctx->peer = &my_identity; - new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - new_msg_ctx->hop_count = 0; - new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make result routing a higher priority */ - new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; - increment_stats(STAT_GET_RESPONSE_START); - route_result_message(cls, &get_result->header, new_msg_ctx); - GNUNET_free(new_msg_ctx); - //send_reply_to_client (datacache_get_ctx->client, &get_result->header, - // datacache_get_ctx->unique_id); - GNUNET_free (get_result); +#endif + eval = GNUNET_BLOCK_evaluate (block_context, + type, + key, + &msg_ctx->reply_bf, + msg_ctx->reply_bf_mutator, + msg_ctx->xquery, + msg_ctx->xquery_size, + data, + size); + switch (eval) + { + case GNUNET_BLOCK_EVALUATION_OK_LAST: + msg_ctx->do_forward = GNUNET_NO; + case GNUNET_BLOCK_EVALUATION_OK_MORE: + new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext)); + memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext)); + get_result = + GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size); + get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); + get_result->header.size = + htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); + get_result->expiration = GNUNET_TIME_absolute_hton(exp); + get_result->type = htons (type); + memcpy (&get_result[1], data, size); + new_msg_ctx->peer = &my_identity; + new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); + new_msg_ctx->hop_count = 0; + new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE * 2; /* Make result routing a higher priority */ + new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; + increment_stats(STAT_GET_RESPONSE_START); + route_result_message(cls, &get_result->header, new_msg_ctx); + GNUNET_free(new_msg_ctx); + GNUNET_free (get_result); + break; + case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: + break; + case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: + break; + case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: + GNUNET_break (0); + break; + case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: + GNUNET_break_op (0); + msg_ctx->do_forward = GNUNET_NO; + break; + case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: + /* msg_ctx->do_forward = GNUNET_NO; // not sure... */ + break; + } return GNUNET_OK; } @@ -2181,14 +2233,11 @@ handle_dht_get (void *cls, struct DHT_MessageContext *message_context) { const struct GNUNET_DHT_GetMessage *get_msg; - uint16_t get_type; - uint16_t bf_size; uint16_t msize; - uint16_t xquery_size; + uint16_t bf_size; unsigned int results; - struct GNUNET_CONTAINER_BloomFilter *bf; - const void *xquery; const char *end; + enum GNUNET_BLOCK_Type type; msize = ntohs (msg->size); if (msize < sizeof (struct GNUNET_DHT_GetMessage)) @@ -2198,34 +2247,34 @@ handle_dht_get (void *cls, } get_msg = (const struct GNUNET_DHT_GetMessage *) msg; bf_size = ntohs (get_msg->bf_size); - xquery_size = ntohs (get_msg->xquery_size); - if (msize != sizeof (struct GNUNET_DHT_GetMessage) + bf_size + xquery_size) + message_context->xquery_size = ntohs (get_msg->xquery_size); + message_context->reply_bf_mutator = get_msg->bf_mutator; /* FIXME: ntohl? */ + if (msize != sizeof (struct GNUNET_DHT_GetMessage) + bf_size + message_context->xquery_size) { GNUNET_break (0); return 0; } end = (const char*) &get_msg[1]; - if (xquery_size == 0) + if (message_context->xquery_size == 0) { - xquery = NULL; + message_context->xquery = NULL; } else { - xquery = (const void*) end; - end += xquery_size; + message_context->xquery = (const void*) end; + end += message_context->xquery_size; } if (bf_size == 0) { - bf = NULL; + message_context->reply_bf = NULL; } else { - bf = GNUNET_CONTAINER_bloomfilter_init (end, - bf_size, - GNUNET_DHT_GET_BLOOMFILTER_K); + message_context->reply_bf = GNUNET_CONTAINER_bloomfilter_init (end, + bf_size, + GNUNET_DHT_GET_BLOOMFILTER_K); } - - get_type = ntohs (get_msg->type); + type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n", @@ -2238,21 +2287,19 @@ handle_dht_get (void *cls, increment_stats(STAT_GETS); results = 0; #if HAVE_MALICIOUS - if (get_type == DHT_MALICIOUS_MESSAGE_TYPE) + if (type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE) { - GNUNET_CONTAINER_bloomfilter_free (bf); + GNUNET_CONTAINER_bloomfilter_free (message_context->reply_bf); return results; } #endif - /* FIXME: put xquery / bf into message_context and use - them for processing! */ + message_context->do_forward = GNUNET_YES; if (datacache != NULL) results = GNUNET_DATACACHE_get (datacache, - &message_context->key, get_type, + &message_context->key, type, &datacache_get_iterator, message_context); - if (results >= 1) { #if DEBUG_DHT @@ -2277,6 +2324,23 @@ handle_dht_get (void *cls, } #endif } + else + { + /* check query valid */ + if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID + == GNUNET_BLOCK_evaluate (block_context, + type, + &message_context->key, + &message_context->reply_bf, + message_context->reply_bf_mutator, + message_context->xquery, + message_context->xquery_size, + NULL, 0)) + { + GNUNET_break_op (0); + message_context->do_forward = GNUNET_NO; + } + } if (message_context->hop_count == 0) /* Locally initiated request */ { @@ -2289,8 +2353,9 @@ handle_dht_get (void *cls, } #endif } - route_message2 (msg, message_context); - GNUNET_CONTAINER_bloomfilter_free (bf); + if (message_context->do_forward == GNUNET_YES) + route_message2 (msg, message_context); + GNUNET_CONTAINER_bloomfilter_free (message_context->reply_bf); return results; } @@ -2495,7 +2560,7 @@ handle_dht_put (void *cls, struct DHT_MessageContext *message_context) { struct GNUNET_DHT_PutMessage *put_msg; - size_t put_type; + enum GNUNET_BLOCK_Type put_type; size_t data_size; int ret; struct RepublishContext *put_context; @@ -2505,9 +2570,9 @@ handle_dht_put (void *cls, put_msg = (struct GNUNET_DHT_PutMessage *)msg; - put_type = ntohs (put_msg->type); + put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type); - if (put_type == DHT_MALICIOUS_MESSAGE_TYPE) + if (put_type == GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE) return; data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); @@ -3756,7 +3821,7 @@ malicious_put_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) put_message.header.size = htons(sizeof(struct GNUNET_DHT_PutMessage)); put_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT); - put_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE); + put_message.type = htonl(GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE); put_message.expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_absolute_get_forever()); memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.client = NULL; @@ -3800,7 +3865,7 @@ malicious_get_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) get_message.header.size = htons(sizeof(struct GNUNET_DHT_GetMessage)); get_message.header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET); - get_message.type = htons(DHT_MALICIOUS_MESSAGE_TYPE); + get_message.type = htonl(GNUNET_BLOCK_DHT_MALICIOUS_MESSAGE_TYPE); memset(&message_context, 0, sizeof(struct DHT_MessageContext)); message_context.client = NULL; random_key = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, (uint32_t)-1); diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index b4014a92c..6650e43ab 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -362,7 +362,7 @@ test_get_iterator (void *cls, const struct GNUNET_PeerIdentity * const *get_path, const struct GNUNET_PeerIdentity * const *put_path, enum GNUNET_BLOCK_Type type, - uint32_t size, const void *data) + size_t size, const void *data) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_get_iterator called (we got a result), stopping get request!\n"); diff --git a/src/dht/test_dht_api_peer1.conf b/src/dht/test_dht_api_peer1.conf index 50cbe7138..06bf9e533 100644 --- a/src/dht/test_dht_api_peer1.conf +++ b/src/dht/test_dht_api_peer1.conf @@ -8,6 +8,7 @@ ACCEPT_FROM6 = ::1; ACCEPT_FROM = 127.0.0.1; HOSTNAME = localhost PORT = 2100 +PREFIX = valgrind [dhtcache] QUOTA = 1000000 diff --git a/src/dht/test_dht_multipeer.c b/src/dht/test_dht_multipeer.c index bc7db2302..293829bc7 100644 --- a/src/dht/test_dht_multipeer.c +++ b/src/dht/test_dht_multipeer.c @@ -394,7 +394,7 @@ void get_result_iterator (void *cls, const struct GNUNET_PeerIdentity * const *get_path, const struct GNUNET_PeerIdentity * const *put_path, enum GNUNET_BLOCK_Type type, - uint32_t size, + size_t size, const void *data) { struct TestGetContext *test_get = cls; diff --git a/src/dht/test_dht_twopeer_put_get.c b/src/dht/test_dht_twopeer_put_get.c index 4d0496db2..9ff2ae18e 100644 --- a/src/dht/test_dht_twopeer_put_get.c +++ b/src/dht/test_dht_twopeer_put_get.c @@ -210,7 +210,7 @@ void get_result_iterator (void *cls, const struct GNUNET_PeerIdentity * const *get_path, const struct GNUNET_PeerIdentity * const *put_path, enum GNUNET_BLOCK_Type type, - uint32_t size, + size_t size, const void *data) { GNUNET_HashCode original_key; /* Key data was stored data under */ -- 2.25.1