From a0fb29fdfc59dbe7c08b61a713685a02d5492750 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 1 Feb 2010 15:47:01 +0000 Subject: [PATCH] fixing fs FIXMEs --- TODO | 12 +-- src/fs/gnunet-service-fs.c | 182 ++++++++++++++++++++++++++++++------- 2 files changed, 157 insertions(+), 37 deletions(-) diff --git a/TODO b/TODO index f1ffad921..6b3af6d6b 100644 --- a/TODO +++ b/TODO @@ -14,11 +14,9 @@ away), in order in which they will likely be done: Urgent items (before announcing ng.gnunet.org): * TRANSPORT: - - main service not implemented [Nate] - - testcases crash & burn (no surprise) + - nondeterministic transport testcase failures [Nate] * CORE: - - test currently fails spectacularly [segv of transport service] - => need transport to work first! + - test currently fails - request connect not working [Christian, need transport first] * TOPOLOGY: - needs testing [need transport first] @@ -28,6 +26,8 @@ Urgent items (before announcing ng.gnunet.org): - implement FS service (P2P operations) + implement sending of queries (low-priority push!) + need to bound queueing of replies for other peers + + content migration support + + hot path routing - test multi-peer search/download * new webpage - run peer => have a 0.9.x hostlist @@ -68,8 +68,8 @@ Urgent items (before announcing ng.gnunet.org): + gnunet-download (many options) + gnunet-directory (man page, options) + gnunet-pseudonym (all of it) - + gnunet-service-fs (remove failing on-demand blocks, many other nitpicks/features/optimizations) - + datastore: do active migration support here? + + gnunet-service-fs (remove failing on-demand blocks, stats, load-based routing, nitpicks) + + datastore: do active migration support here? - implement adv. FS testcases + getopt API + insert: sblocks, loc uris diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 93123c8d8..2f9af1ad1 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -24,14 +24,11 @@ * @author Christian Grothoff * * TODO: - * - forward_request_task (P2P forwarding!) * - track stats for hot-path routing * - implement hot-path routing decision procedure - * - detect duplicate requests (P2P and CS) * - implement: bound_priority, test_load_too_high, validate_skblock * - add content migration support (store locally) * - statistics - * */ #include "platform.h" #include @@ -527,8 +524,7 @@ struct PendingRequest uint32_t remaining_priority; /** - * Number to mingle hashes for bloom-filter - * tests with. + * Number to mingle hashes for bloom-filter tests with. */ int32_t mingle; @@ -655,9 +651,9 @@ destroy_pending_request (struct PendingRequest *pr) pr->hnode); pr->hnode = NULL; } - /* might have already been removed from map - in 'process_reply' if there was a unique - reply; hence ignore the return value here */ + /* might have already been removed from map in 'process_reply' (if + there was a unique reply) or never inserted if it was a + duplicate; hence ignore the return value here */ (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map, &pr->query, pr); @@ -1127,7 +1123,6 @@ transmit_query_continuation (void *cls, } -#if 0 /** * How many bytes should a bloomfilter be if we have already seen * entry_count responses? Note that BLOOMFILTER_K gives us the number @@ -1193,7 +1188,6 @@ refresh_bloomfilter (unsigned int count, } return bf; } -#endif /** @@ -1244,6 +1238,7 @@ target_reservation_cb (void *cls, size_t msize; unsigned int k; int no_route; + uint32_t bm; pr->irc = NULL; GNUNET_assert (peer != NULL); @@ -1267,12 +1262,22 @@ target_reservation_cb (void *cls, /* build message and insert message into priority queue */ k = 0; + bm = 0; + if (GNUNET_YES == no_route) + { + bm |= GET_MESSAGE_BIT_RETURN_TO; + k++; + } if (pr->namespace != NULL) - k++; + { + bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; + k++; + } if (pr->target_pid != 0) - k++; - if (GNUNET_YES == no_route) - k++; + { + bm |= GET_MESSAGE_BIT_TRANSMIT_TO; + k++; + } msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode); GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); @@ -1284,17 +1289,17 @@ target_reservation_cb (void *cls, pr->remaining_priority /= 2; gm->priority = htonl (pr->remaining_priority); gm->ttl = htonl (pr->ttl); - gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion? - gm->hash_bitmap = htonl (42); // FIXME! + gm->filter_mutator = htonl(pr->mingle); + gm->hash_bitmap = htonl (bm); gm->query = pr->query; ext = (GNUNET_HashCode*) &gm[1]; k = 0; + if (GNUNET_YES == no_route) + GNUNET_PEER_resolve (pr->pht_entry->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); if (pr->namespace != NULL) memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); if (pr->target_pid != 0) GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); - if (GNUNET_YES == no_route) - GNUNET_PEER_resolve (pr->pht_entry->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); bfdata = (char *) &ext[k]; if (pr->bf != NULL) GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, @@ -1709,9 +1714,15 @@ process_reply (void *cls, GNUNET_array_grow (pr->replies_seen, pr->replies_seen_size, pr->replies_seen_size * 2 + 4); - // FIXME: recalculate BF! + if (pr->bf != NULL) + GNUNET_CONTAINER_bloomfilter_free (pr->bf); + pr->bf = refresh_bloomfilter (pr->replies_seen_off, + &pr->mingle, + &pr->bf_size, + pr->replies_seen); } - pr->replies_seen[pr->replies_seen_off++] = chash; + pr->replies_seen[pr->replies_seen_off++] = chash; + } break; case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: @@ -2047,6 +2058,53 @@ bound_priority (uint32_t prio_in, } +/** + * Closure for 'check_duplicate_request'. + */ +struct CheckDuplicateRequestClosure +{ + /** + * The new request we should check if it already exists. + */ + const struct PendingRequest *pr; + + /** + * Existing request found by the checker, NULL if none. + */ + struct PendingRequest *have; +}; + + +/** + * Iterator over entries in the 'query_request_map' that + * tries to see if we have the same request pending from + * the same peer already. + * + * @param cls closure (our 'struct CheckDuplicateRequestClosure') + * @param key current key code (query, ignored, must match) + * @param value value in the hash map (a 'struct PendingRequest' + * that already exists) + * @return GNUNET_YES if we should continue to + * iterate (no match yet) + * GNUNET_NO if not (match found). + */ +static int +check_duplicate_request (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct CheckDuplicateRequestClosure *cdc = cls; + struct PendingRequest *have = value; + + if (cdc->pr->target_pid == have->target_pid) + { + cdc->have = have; + return GNUNET_NO; + } + return GNUNET_YES; +} + + /** * Handle P2P "GET" request. * @@ -2070,6 +2128,7 @@ handle_p2p_get (void *cls, struct PeerRequestEntry *pre; struct ConnectedPeer *cp; struct ConnectedPeer *cps; + struct CheckDuplicateRequestClosure cdc; struct GNUNET_TIME_Relative timeout; uint16_t msize; const struct GetMessage *gm; @@ -2103,7 +2162,6 @@ handle_p2p_get (void *cls, } opt = (const GNUNET_HashCode*) &gm[1]; bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); - bm = ntohl (gm->hash_bitmap); if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) && (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ) @@ -2150,7 +2208,7 @@ handle_p2p_get (void *cls, if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) pr->namespace = (GNUNET_HashCode*) &pr[1]; pr->type = ntohl (gm->type); - pr->mingle = gm->filter_mutator; + pr->mingle = ntohl (gm->filter_mutator); if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) @@ -2194,8 +2252,35 @@ handle_p2p_get (void *cls, pr->bf_size = bfsize; } - /* FIXME: check somewhere if request already exists, and if so, - recycle old state... */ + cdc.have = NULL; + cdc.pr = pr; + GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, + &gm->query, + &check_duplicate_request, + &cdc); + if (cdc.have != NULL) + { + if (cdc.have->start_time.value + cdc.have->ttl >= + pr->start_time.value + pr->ttl) + { + /* existing request has higher TTL, drop new one! */ + cdc.have->priority += pr->priority; + destroy_pending_request (pr); + return GNUNET_OK; + } + else + { + /* existing request has lower TTL, drop old one! */ + pr->priority += cdc.have->priority; + /* Possible optimization: if we have applicable pending + replies in 'cdc.have', we might want to move those over + (this is a really rare special-case, so it is not clear + that this would be worth it) */ + destroy_pending_request (cdc.have); + /* keep processing 'pr'! */ + } + } + pre = GNUNET_malloc (sizeof (struct PeerRequestEntry)); pre->cp = cp; pre->req = pr; @@ -2206,7 +2291,7 @@ handle_p2p_get (void *cls, pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr, - GNUNET_TIME_absolute_get().value + pr->ttl); + pr->start_time.value + pr->ttl); /* calculate change in traffic preference */ @@ -2306,8 +2391,41 @@ handle_start_search (void *cls, GNUNET_h2s (&sm->query), (unsigned int) type); #endif - /* FIXME: detect duplicate request; if duplicate, simply update (merge) - 'pr->replies_seen'! */ + + /* detect duplicate KBLOCK requests */ + if (type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) + { + crl = cl->rl_head; + while ( (crl != NULL) && + ( (0 != memcmp (&crl->req->query, + &sm->query, + sizeof (GNUNET_HashCode))) || + (crl->req->type != type) ) ) + crl = crl->next; + if (crl != NULL) + { + pr = crl->req; + /* Duplicate request (used to send long list of + known/blocked results); merge 'pr->replies_seen' + and update bloom filter */ + GNUNET_array_grow (pr->replies_seen, + pr->replies_seen_size, + pr->replies_seen_off + sc); + memcpy (&pr->replies_seen[pr->replies_seen_off], + &sm[1], + sc * sizeof (GNUNET_HashCode)); + pr->replies_seen_off += sc; + if (pr->bf != NULL) + GNUNET_CONTAINER_bloomfilter_free (pr->bf); + pr->bf = refresh_bloomfilter (pr->replies_seen_off, + &pr->mingle, + &pr->bf_size, + pr->replies_seen); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); + return; + } + } pr = GNUNET_malloc (sizeof (struct PendingRequest) + ((type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0)); crl = GNUNET_malloc (sizeof (struct ClientRequestList)); @@ -2326,10 +2444,12 @@ handle_start_search (void *cls, &sm[1], sc * sizeof (GNUNET_HashCode)); pr->replies_seen_off = sc; - pr->anonymity_level = ntohl (sm->anonymity_level); - pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, - (uint32_t) -1); - pr->query = sm->query; + pr->anonymity_level = ntohl (sm->anonymity_level); + pr->bf = refresh_bloomfilter (pr->replies_seen_off, + &pr->mingle, + &pr->bf_size, + pr->replies_seen); + pr->query = sm->query; switch (type) { case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: -- 2.25.1