From: Christian Grothoff Date: Tue, 22 Sep 2009 17:33:31 +0000 (+0000) Subject: more work on fs X-Git-Tag: initial-import-from-subversion-38251~23481 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=9360aeb6e64646e9912642fc45d51e030003cb98;p=oweals%2Fgnunet.git more work on fs --- diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 0a9fbcefe..b9a2dfa79 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -24,18 +24,18 @@ * @author Christian Grothoff * * TODO: - * - tracking of PendingRequests - * - setup P2P search on CS request - * - setup P2P search on P2P GET - * - forward replies based on tracked requests * - validation of KBLOCKS (almost done) * - validation of SBLOCKS * - validation of KSBLOCKS - * - content migration (put in local DS) - * - possible major issue: we may - * queue "gazillions" of (K|S)Blocks for the - * core to transmit to another peer; need - * to make sure this is bounded overall... + * - actually DO P2P search upon P2P/CS requests (pass appropriate handler to core + * and work through request list there; also update ttl/priority for our client's requests) + * - randomly delay processing for improved anonymity (can wait) + * - content migration (put in local DS) (can wait) + * - check that we decrement PIDs always where necessary (can wait) + * - handle some special cases when forwarding replies based on tracked requests (can wait) + * - tracking of success correlations for hot-path routing (can wait) + * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the + * core to transmit to another peer; need to make sure this is bounded overall... * - various load-based actions (can wait) * - remove on-demand blocks if they keep failing (can wait) */ @@ -202,13 +202,14 @@ struct LocalGetContext * level is >0 we may happen to know the responder's identity; * nevertheless, we should probably not use it for a DHT-lookup * or similar blunt actions in order to avoid exposing ourselves). - *

+ */ + struct GNUNET_PeerIdentity target; + + /** * If the request is for an SBLOCK, this is the identity of the * pseudonym to which the SBLOCK belongs. - *

- * If the request is for a KBLOCK, "target" must be all zeros. */ - GNUNET_HashCode target; + GNUNET_HashCode namespace; /** * Hash of the keyword (aka query) for KBLOCKs; Hash of @@ -341,6 +342,25 @@ struct ProcessGetContext }; +/** + * Information we keep for each pending reply. + */ +struct PendingReply +{ + /** + * This is a linked list. + */ + struct PendingReply *next; + + /** + * Size of the reply; actual reply message follows + * at the end of this struct. + */ + size_t msize; + +}; + + /** * All requests from a client are * kept in a doubly-linked list. @@ -381,6 +401,25 @@ struct PendingRequest */ struct GNUNET_CONTAINER_BloomFilter *bf; + /** + * Replies that we have received but were unable to forward yet + * (typically non-null only if we have a pending transmission + * request with the client or the respective peer). + */ + struct PendingReply *replies_pending; + + /** + * Pending transmission request with the core service for the target + * peer (for processing of 'replies_pending'). + */ + struct GNUNET_CORE_TransmitHandle *cth; + + /** + * Pending transmission request for the target client (for processing of + * 'replies_pending'). + */ + struct GNUNET_CONNECTION_TransmitHandle *th; + /** * Hash code of all replies that we have seen so far (only valid * if client is not NULL since we only track replies like this for @@ -417,6 +456,11 @@ struct PendingRequest */ GNUNET_PEER_Id *used_pids; + /** + * Desired anonymity level; only valid for requests from a local client. + */ + uint32_t anonymity_level; + /** * How many entries in "used_pids" are actually valid? */ @@ -451,6 +495,12 @@ struct PendingRequest */ uint32_t remaining_priority; + /** + * Number to mingle hashes for bloom-filter + * tests with. + */ + int32_t mingle; + /** * TTL with which we saw this request (or, if we initiated, TTL that * we used for the request). @@ -551,12 +601,6 @@ struct ProcessReplyClosure }; -/** - * Map from queries to pending requests ("struct PendingRequest") for - * this query. - */ -static struct GNUNET_CONTAINER_MultiHashMap *request_map; - /** * Our connection to the datastore. */ @@ -603,10 +647,9 @@ static struct DatastoreRequestQueue *drq_tail; static struct IndexInfo *indexed_files; /** - * Maps hash over content of indexed files - * to the respective filename. The filenames - * are pointers into the indexed_files linked - * list and do not need to be freed. + * Maps hash over content of indexed files to the respective filename. + * The filenames are pointers into the indexed_files linked list and + * do not need to be freed. */ static struct GNUNET_CONTAINER_MultiHashMap *ifm; @@ -627,10 +670,19 @@ static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer; static struct ClientList *clients; /** - * Heap with the request that will expire next at the top. + * Heap with the request that will expire next at the top. Contains + * pointers of type "struct PendingRequest*"; these will *also* be + * aliased from the "requests_by_peer" data structures and the + * "requests_by_query" table. Note that requests from our clients + * don't expire and are thus NOT in the "requests_by_expiration" + * (or the "requests_by_peer" tables). */ static struct GNUNET_CONTAINER_Heap *requests_by_expiration; +/** + * FIXME: set from configuration. + */ +static uint64_t max_pending_requests = 32; /** * Write the current index information list to disk. @@ -1332,6 +1384,70 @@ handle_on_demand_block (const GNUNET_HashCode * key, } +/** + * How many bytes should a bloomfilter be if we have already seen + * entry_count responses? Note that BLOOMFILTER_K gives us the number + * of bits set per entry. Furthermore, we should not re-size the + * filter too often (to keep it cheap). + * + * Since other peers will also add entries but not resize the filter, + * we should generally pick a slightly larger size than what the + * strict math would suggest. + * + * @return must be a power of two and smaller or equal to 2^15. + */ +static unsigned int +compute_bloomfilter_size (unsigned int entry_count) +{ + unsigned int size; + unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4; + uint16_t max = 1 << 15; + + if (entry_count > max) + return max; + size = 8; + while ((size < max) && (size < ideal)) + size *= 2; + if (size > max) + return max; + return size; +} + + +/** + * Recalculate our bloom filter for filtering replies. + * + * @param count number of entries we are filtering right now + * @param mingle set to our new mingling value + * @param entries the entries to filter + * @return updated bloomfilter, NULL for none + */ +static struct GNUNET_CONTAINER_BloomFilter * +refresh_bloomfilter (unsigned int count, + int32_t * mingle, + const GNUNET_HashCode *entries) +{ + struct GNUNET_CONTAINER_BloomFilter *bf; + unsigned int nsize; + unsigned int i; + GNUNET_HashCode mhash; + + if (0 == count) + return NULL; + nsize = compute_bloomfilter_size (count); + *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1); + bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + nsize, + BLOOMFILTER_K); + for (i=0;itype == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) { - // FIXME: initiate P2P search - return; + cl = clients; + while ( (NULL != cl) && + (cl->client != lgc->client) ) + cl = cl->next; + if (cl == NULL) + { + cl = GNUNET_malloc (sizeof (struct ClientList)); + cl->client = lgc->client; + cl->next = clients; + clients = cl; + } + crl = GNUNET_malloc (sizeof (struct ClientRequestList)); + GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl); + pr = GNUNET_malloc (sizeof (struct PendingRequest)); + pr->client = lgc->client; + GNUNET_SERVER_client_keep (pr->client); + pr->crl_entry = crl; + crl->req = pr; + if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) + { + pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode)); + *pr->namespace = lgc->namespace; + } + pr->replies_seen = lgc->results; + lgc->results = NULL; + pr->start_time = GNUNET_TIME_absolute_get (); + pr->query = lgc->query; + pr->target_pid = GNUNET_PEER_intern (&lgc->target); + pr->replies_seen_off = lgc->results_used; + pr->replies_seen_size = lgc->results_size; + lgc->results_size = 0; + pr->type = lgc->type; + pr->anonymity_level = lgc->anonymity_level; + pr->bf = refresh_bloomfilter (pr->replies_seen_off, + &pr->mingle, + pr->replies_seen); + GNUNET_CONTAINER_multihashmap_put (requests_by_query, + &pr->query, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + // FIXME: trigger some processing NOW! + local_get_context_free (lgc); + return; } /* got all possible results, clean up! */ local_get_context_free (lgc); @@ -1529,7 +1690,18 @@ handle_start_search (void *cls, lgc->client = client; lgc->type = ntohl (sm->type); lgc->anonymity_level = ntohl (sm->anonymity_level); - lgc->target = sm->target; + switch (lgc->type) + { + case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: + case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: + lgc->target.hashPubKey = sm->target; + break; + case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: + lgc->namespace = sm->target; + break; + default: + break; + } lgc->query = sm->query; GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc); lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL, @@ -1565,15 +1737,27 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { static void destroy_pending_request (struct PendingRequest *pr) { + struct PendingReply *reply; + GNUNET_CONTAINER_multihashmap_remove (requests_by_query, &pr->query, pr); // FIXME: not sure how this can work (efficiently) // also, what does the return value mean? - GNUNET_CONTAINER_heap_remove_node (requests_by_expiration, - pr); + if (pr->client != NULL) + GNUNET_CONTAINER_heap_remove_node (requests_by_expiration, + pr); if (NULL != pr->bf) GNUNET_CONTAINER_bloomfilter_free (pr->bf); + if (NULL != pr->cth) + GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); + if (NULL != pr->th) + GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th); + while (NULL != (reply = pr->replies_pending)) + { + pr->replies_pending = reply->next; + GNUNET_free (reply); + } GNUNET_PEER_change_rc (pr->source_pid, -1); GNUNET_PEER_change_rc (pr->target_pid, -1); GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); @@ -1648,9 +1832,13 @@ shutdown_task (void *cls, GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); dsh = NULL; - // FIXME: iterate over 'request_map' to free entries! - GNUNET_CONTAINER_multihashmap_destroy (request_map); - request_map = NULL; + // FIXME: iterate over maps to free entries! + GNUNET_CONTAINER_multihashmap_destroy (requests_by_query); + requests_by_query = NULL; + GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer); + requests_by_peer = NULL; + GNUNET_CONTAINER_heap_destroy (requests_by_expiration); + requests_by_expiration = NULL; GNUNET_CONTAINER_multihashmap_destroy (ifm); ifm = NULL; while (NULL != (pos = indexed_files)) @@ -1716,12 +1904,52 @@ forward_get_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct ProcessGetContext *pgc = cls; + struct PendingRequest *pr; + struct PendingRequest *eer; + struct GNUNET_PeerIdentity target; - // FIXME: install entry in - // 'request_map' and do actual - // forwarding... - if (pgc->bf != NULL) - GNUNET_CONTAINER_bloomfilter_free (pgc->bf); + pr = GNUNET_malloc (sizeof (struct PendingRequest)); + if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm)) + { + pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode)); + *pr->namespace = pgc->namespace; + } + pr->bf = pgc->bf; + pgc->bf = NULL; + pr->start_time = pgc->start_time; + pr->query = pgc->query; + pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to); + if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm)) + pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target); + pr->anonymity_level = 1; /* default */ + pr->priority = pgc->priority; + pr->remaining_priority = pr->priority; + pr->mingle = pgc->mingle; + pr->ttl = pgc->ttl; + pr->type = pgc->type; + GNUNET_CONTAINER_multihashmap_put (requests_by_query, + &pr->query, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_CONTAINER_multihashmap_put (requests_by_peer, + &pgc->reply_to.hashPubKey, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_CONTAINER_heap_insert (requests_by_expiration, + pr, + pr->start_time.value + pr->ttl); + if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests) + { + /* expire oldest request! */ + eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); + GNUNET_PEER_resolve (eer->source_pid, + &target); + GNUNET_CONTAINER_multihashmap_remove (requests_by_peer, + &target.hashPubKey, + eer); + destroy_pending_request (eer); + } + // FIXME: trigger actual forwarding NOW! GNUNET_free (pgc); } @@ -1898,9 +2126,8 @@ process_p2p_get_result (void *cls, /** - * We're processing a GET request from - * another peer. Give it to our local - * datastore. + * We're processing a GET request from another peer. Give it to our + * local datastore. * * @param cls our "struct ProcessGetContext" * @param ok did we get a datastore slice or not? @@ -2149,6 +2376,42 @@ handle_p2p_get (void *cls, } +/** + * Function called to notify us that we can now transmit a reply to a + * client or peer. "buf" will be NULL and "size" zero if the socket was + * closed for writing in the meantime. + * + * @param cls closure, points to a "struct PendingRequest*" with + * one or more pending replies + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_result (void *cls, + size_t size, + void *buf) +{ + struct PendingRequest *pr = cls; + char *cbuf = buf; + struct PendingReply *reply; + size_t ret; + + ret = 0; + while (NULL != (reply = pr->replies_pending)) + { + if ( (reply->msize + ret < ret) || + (reply->msize + ret > size) ) + break; + pr->replies_pending = reply->next; + memcpy (&cbuf[ret], &reply[1], reply->msize); + ret += reply->msize; + GNUNET_free (pr); + } + return 0; +} + + /** * Iterator over pending requests. * @@ -2164,10 +2427,138 @@ process_reply (void *cls, { struct ProcessReplyClosure *prq = cls; struct PendingRequest *pr = value; - - fprintf (stderr, "FIXME %p %p\n", prq, pr); - // FIXME: forward reply to client - // or other peers (depending on pr...) + struct PendingRequest *eer; + struct PendingReply *reply; + struct PutMessage *pm; + struct ContentMessage *cm; + GNUNET_HashCode chash; + GNUNET_HashCode mhash; + struct GNUNET_PeerIdentity target; + size_t msize; + uint32_t prio; + struct GNUNET_TIME_Relative max_delay; + + GNUNET_CRYPTO_hash (prq->data, + prq->size, + &chash); + switch (prq->type) + { + case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: + case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: + if (0 != memcmp (&chash, + key, + sizeof (GNUNET_HashCode))) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + break; + case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: + // FIXME: validate KBlock! + if (pr->bf != NULL) + { + mingle_hash (&chash, pr->mingle, &mhash); + if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf, + &mhash)) + return GNUNET_YES; /* duplicate */ + GNUNET_CONTAINER_bloomfilter_add (pr->bf, + &mhash); + } + break; + case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: + // FIXME: validate SBlock! + if (pr->bf != NULL) + { + mingle_hash (&chash, pr->mingle, &mhash); + if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf, + &mhash)) + return GNUNET_YES; /* duplicate */ + GNUNET_CONTAINER_bloomfilter_add (pr->bf, + &mhash); + } + break; + case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK: + // FIXME: validate SKBlock! + break; + } + prio = pr->priority; + prq->priority += pr->remaining_priority; + pr->remaining_priority = 0; + if (pr->client != NULL) + { + if (pr->replies_seen_size == pr->replies_seen_off) + GNUNET_array_grow (pr->replies_seen, + pr->replies_seen_size, + pr->replies_seen_size * 2 + 4); + pr->replies_seen[pr->replies_seen_off++] = chash; + // FIXME: possibly recalculate BF! + } + if (pr->client == NULL) + { + msize = sizeof (struct ContentMessage) + prq->size; + reply = GNUNET_malloc (msize + sizeof (struct PendingReply)); + reply->msize = msize; + cm = (struct ContentMessage*) &reply[1]; + cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); + cm->header.size = htons (msize); + cm->type = htonl (prq->type); + cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration); + reply->next = pr->replies_pending; + pr->replies_pending = reply; + memcpy (&reply[1], prq->data, prq->size); + if (pr->cth != NULL) + return GNUNET_YES; + max_delay = GNUNET_TIME_UNIT_FOREVER_REL; + if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests) + { + /* estimate expiration time from time difference between + first request that will be discarded and this request */ + eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); + max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time, + eer->start_time); + } + GNUNET_PEER_resolve (pr->source_pid, + &target); + pr->cth = GNUNET_CORE_notify_transmit_ready (core, + prio, + max_delay, + &target, + msize, + &transmit_result, + pr); + if (NULL == pr->cth) + { + // FIXME: now what? discard? + } + } + else + { + msize = sizeof (struct PutMessage) + prq->size; + reply = GNUNET_malloc (msize + sizeof (struct PendingReply)); + reply->msize = msize; + reply->next = pr->replies_pending; + pm = (struct PutMessage*) &reply[1]; + pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); + pm->header.size = htons (msize); + pm->type = htonl (prq->type); + pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration)); + pr->replies_pending = reply; + memcpy (&reply[1], prq->data, prq->size); + if (pr->th != NULL) + return GNUNET_YES; + pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_result, + pr); + if (pr->th == NULL) + { + // FIXME: need to try again later (not much + // to do here specifically, but we need to + // check somewhere else to handle this case!) + } + } + // FIXME: implement hot-path routing statistics keeping! return GNUNET_YES; } @@ -2256,7 +2647,7 @@ handle_p2p_put (void *cls, prq.type = type; prq.expiration = expiration; prq.priority = 0; - GNUNET_CONTAINER_multihashmap_get_multiple (request_map, + GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query, &query, &process_reply, &prq); @@ -2364,7 +2755,9 @@ run (void *cls, cfg = c; ifm = GNUNET_CONTAINER_multihashmap_create (128); - request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config + requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config + requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config + requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); read_index_list (); dsh = GNUNET_DATASTORE_connect (cfg, sched);