From faecd2a496b5d356509b0b6b0157db34e8b3188e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 10 Mar 2011 09:36:50 +0000 Subject: [PATCH] stuff --- src/fs/gnunet-service-fs_new.c | 106 ++++++++----- src/fs/gnunet-service-fs_pr.c | 277 +++++++++++++++++++++++++++++++-- src/fs/gnunet-service-fs_pr.h | 49 +++--- 3 files changed, 365 insertions(+), 67 deletions(-) diff --git a/src/fs/gnunet-service-fs_new.c b/src/fs/gnunet-service-fs_new.c index f0a1513c3..4b22a0c52 100644 --- a/src/fs/gnunet-service-fs_new.c +++ b/src/fs/gnunet-service-fs_new.c @@ -27,7 +27,7 @@ * - GSF_plan_get_ (!) * - GSF_plan_size_ (?) * - GSF_plan_notify_request_done (!) - * - + * - consider re-issue GSF_dht_lookup_ after non-DHT reply received * * */ @@ -223,6 +223,49 @@ plan (struct GSF_ConnectedPeer *cp, } +/** + * We have a new request, consider forwarding it to the given + * peer. + * + * @param cls the 'struct GSF_PendingRequest' + * @param peer identity of the peer + * @param cp handle to the connected peer record + * @param perf peer performance data + */ +static void +consider_request_for_forwarding (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct GSF_ConnectedPeer *cp, + const struct GSF_PeerPerformanceData *ppd) +{ + struct GSF_PendingRequest *pr = cls; + + plan (cp, pr); +} + + +/** + * Function to be called after we're done processing + * replies from the local lookup. If the result status + * code indicates that there may be more replies, plan + * forwarding the request. + * + * @param cls closure (NULL) + * @param pr the pending request we were processing + * @param result final datastore lookup result + */ +static void +consider_forwarding (void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) +{ + if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) + return; /* we're done... */ + GSF_iterate_connected_peers_ (&consider_request_for_forwarding, + pr); +} + + /** * Handle P2P "GET" request. * @@ -244,31 +287,37 @@ handle_p2p_get (void *cls, pr = GSF_handle_p2p_query_ (other, message); if (NULL == pr) - return GNUNET_SYSERR; - /* FIXME: local lookup! */ - /* FIXME: after local lookup, trigger forwarding/routing! */ + return GNUNET_SYSERR; + GSF_local_lookup_ (pr, + &consider_forwarding, + NULL); return GNUNET_OK; } /** - * We have a new request, consider forwarding it to the given - * peer. + * We're done with the local lookup, now consider + * P2P processing (depending on request options and + * result status). Also signal that we can now + * receive more request information from the client. * - * @param cls the 'struct GSF_PendingRequest' - * @param peer identity of the peer - * @param cp handle to the connected peer record - * @param perf peer performance data + * @param cls the client doing the request ('struct GNUNET_SERVER_Client') + * @param pr the pending request we were processing + * @param result final datastore lookup result */ static void -consider_request_for_forwarding (void *cls, - const struct GNUNET_PeerIdentity *peer, - struct GSF_ConnectedPeer *cp, - const struct GSF_PeerPerformanceData *ppd) +start_p2p_processing (void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result) { - struct GSF_PendingRequest *pr = cls; + struct GNUNET_SERVER_Client *client = cls; - plan (cp, pr); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); + if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) + return; /* we're done... */ + GSF_dht_lookup_ (pr); + consider_forwarding (NULL, pr, result); } @@ -292,28 +341,9 @@ handle_start_search (void *cls, /* 'GNUNET_SERVER_receive_done was already called! */ return; } - /* FIXME: local lookup, then (after DB done!) receive_done: */ - GNUNET_SERVER_receive_done (client, - GNUNET_OK); -#if 0 - /* FIXME: also do DHT lookup */ - struct GNUNET_DHT_GetHandle *gh; - /* store 'gh' with 'pr', cancel it on pr destruction, etc. */ - gh = GNUNET_DHT_get_start (GSF_dht, - timeout, - type, - key, - des_repl_level, - options, - bf, - bf_mutator, - xquery, - xquery_size, - &GSF_handle_dht_reply_, - pr); -#endif - GSF_iterate_connected_peers_ (&consider_request_for_forwarding, - pr); + GSF_local_lookup_ (pr, + &start_p2p_processing, + client); } diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 45767f204..58af8be65 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -64,6 +64,16 @@ struct GSF_PendingRequest */ struct GNUNET_CONTAINER_HeapNode *hnode; + /** + * Datastore queue entry for this request (or NULL for none). + */ + struct GNUNET_DATASTORE_QueueEntry *qe; + + /** + * DHT request handle for this request (or NULL for none). + */ + struct GNUNET_DHT_GetHandle *gh; + /** * Identity of the peer that we should use for the 'sender' * (recipient of the response) when forwarding (0 for none). @@ -500,6 +510,10 @@ clean_request (void *cls, if (NULL != pr->hnode) GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap, pr->hnode); + if (NULL != pr->qe) + GNUNET_DATASTORE_cancel (pr->qe); + if (NULL != pr->gh) + GNUNET_DHT_get_stop (pr->gh); GNUNET_free (pr); return GNUNET_YES; } @@ -713,6 +727,10 @@ process_reply (void *cls, 1, GNUNET_NO); } + else + { + GSF_dht_lookup_ (pr); + } prq->priority += pr->public_data.original_priority; pr->public_data.priority = 0; pr->public_data.original_priority = 0; @@ -799,15 +817,15 @@ test_put_load_too_high (uint32_t priority) * @param size number of bytes in data * @param data pointer to the result data */ -void -GSF_handle_dht_reply_ (void *cls, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode *key, - const struct GNUNET_PeerIdentity * const *get_path, - const struct GNUNET_PeerIdentity * const *put_path, - enum GNUNET_BLOCK_Type type, - size_t size, - const void *data) +static void +handle_dht_reply (void *cls, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode *key, + const struct GNUNET_PeerIdentity * const *get_path, + const struct GNUNET_PeerIdentity * const *put_path, + enum GNUNET_BLOCK_Type type, + size_t size, + const void *data) { struct GSF_PendingRequest *pr = cls; struct ProcessReplyClosure prq; @@ -842,6 +860,247 @@ GSF_handle_dht_reply_ (void *cls, } +/** + * Consider looking up the data in the DHT (anonymity-level permitting). + * + * @param pr the pending request to process + */ +void +GSF_dht_lookup_ (struct GSF_PendingRequest *pr) +{ + const void *xquery; + size_t xquery_size; + struct GNUNET_PeerIdentity pi; + char buf[sizeof (GNUNET_HashCode) * 2]; + + if (0 != pr->public_data.anonymity_level) + return; + if (NULL != pr->gh) + { + GNUNET_DHT_get_stop (pr->gh); + pr->gh = NULL; + } + xquery = NULL; + xquery_size = 0; + if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) + { + xquery = buf; + memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode)); + xquery_size = sizeof (GNUNET_HashCode); + } + if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY)) + { + GNUNET_PEER_resolve (pr->sender_pid, + &pi); + memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); + xquery_size += sizeof (struct GNUNET_PeerIdentity); + } + pr->gh = GNUNET_DHT_get_start (GSF_dht, + GNUNET_TIME_UNIT_FOREVER_REL, + pr->public_data.type, + &pr->public_data.query, + DEFAULT_GET_REPLICATION, + GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, + pr->bf, + pr->mingle, + xquery, + xquery_size, + &handle_dht_reply, + pr); +} + + +/** + * We're processing (local) results for a search request + * from another peer. Pass applicable results to the + * peer and if we are done either clean up (operation + * complete) or forward to other peers (more results possible). + * + * @param cls our closure (struct LocalGetContext) + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +process_local_reply (void *cls, + const GNUNET_HashCode * key, + size_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) +{ +#if FIXME + struct PendingRequest *pr = cls; + struct ProcessReplyClosure prq; + struct CheckDuplicateRequestClosure cdrc; + GNUNET_HashCode query; + unsigned int old_rf; + + if (NULL == key) + { +#if DEBUG_FS > 1 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Done processing local replies, forwarding request to other peers.\n"); +#endif + pr->qe = NULL; + if (pr->client_request_list != NULL) + { + GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, + GNUNET_YES); + /* Figure out if this is a duplicate request and possibly + merge 'struct PendingRequest' entries */ + cdrc.have = NULL; + cdrc.pr = pr; + GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, + &pr->query, + &check_duplicate_request_client, + &cdrc); + if (cdrc.have != NULL) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received request for block `%s' twice from client, will only request once.\n", + GNUNET_h2s (&pr->query)); +#endif + + destroy_pending_request (pr); + return; + } + } + if (pr->local_only == GNUNET_YES) + { + destroy_pending_request (pr); + return; + } + /* no more results */ + if (pr->task == GNUNET_SCHEDULER_NO_TASK) + pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, + pr); + return; + } +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "New local response to `%s' of type %u.\n", + GNUNET_h2s (key), + type); +#endif + if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found ONDEMAND block, performing on-demand encoding\n"); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# on-demand blocks matched requests"), + 1, + GNUNET_NO); + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, + anonymity, expiration, uid, + &process_local_reply, + pr)) + if (pr->qe != NULL) + { + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + } + return; + } + old_rf = pr->results_found; + memset (&prq, 0, sizeof (prq)); + prq.data = data; + prq.expiration = expiration; + prq.size = size; + if (GNUNET_OK != + GNUNET_BLOCK_get_key (block_ctx, + type, + data, + size, + &query)) + { + GNUNET_break (0); + GNUNET_DATASTORE_remove (dsh, + key, + size, data, + -1, -1, + GNUNET_TIME_UNIT_FOREVER_REL, + NULL, NULL); + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + return; + } + prq.type = type; + prq.priority = priority; + prq.finished = GNUNET_NO; + prq.request_found = GNUNET_NO; + prq.anonymity_level = anonymity; + if ( (old_rf == 0) && + (pr->results_found == 0) ) + update_datastore_delays (pr->start_time); + process_reply (&prq, key, pr); + if (prq.finished == GNUNET_YES) + return; + if (pr->qe == NULL) + return; /* done here */ + if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) + { + pr->local_only = GNUNET_YES; /* do not forward */ + GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); + return; + } + if ( (pr->client_request_list == NULL) && + ( (GNUNET_YES == test_get_load_too_high (0)) || + (pr->results_found > 5 + 2 * pr->priority) ) ) + { +#if DEBUG_FS > 2 + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Load too high, done with request\n"); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# processing result set cut short due to load"), + 1, + GNUNET_NO); + GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); + return; + } + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); +#endif +} + + +/** + * Look up the request in the local datastore. + * + * @param pr the pending request to process + * @param cont function to call at the end + * @param cont_cls closure for cont + */ +void +GSF_local_lookup_ (struct GSF_PendingRequest *pr, + GSF_LocalLookupContinuation cont, + void *cont_cls) +{ + // FIXME: fix process_local_reply / cont! + GNUNET_assert (NULL == pr->gh); + pr->qe = GNUNET_DATASTORE_get (GSF_dsh, + &pr->public_data.query, + pr->public_data.type, + 1 /* queue priority */, + 1 /* max queue size */, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_local_reply, + pr); +} + + + /** * Handle P2P "CONTENT" message. Checks that the message is * well-formed and then checks if there are any pending requests for diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index b59cbc541..39a5fc77f 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -299,29 +299,38 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, /** - * Iterator called on each result obtained for a DHT - * operation that expects a reply + * Consider looking up the data in the DHT (anonymity-level permitting). * - * @param cls closure, the 'struct GSF_PendingRequest *'. - * @param exp when will this value expire - * @param key key of the result - * @param get_path NULL-terminated array of pointers - * to the peers on reverse GET path (or NULL if not recorded) - * @param put_path NULL-terminated array of pointers - * to the peers on the PUT path (or NULL if not recorded) - * @param type type of the result - * @param size number of bytes in data - * @param data pointer to the result data + * @param pr the pending request to process */ void -GSF_handle_dht_reply_ (void *cls, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, - const struct GNUNET_PeerIdentity * const *get_path, - const struct GNUNET_PeerIdentity * const *put_path, - enum GNUNET_BLOCK_Type type, - size_t size, - const void *data); +GSF_dht_lookup_ (struct GSF_PendingRequest *pr); + + +/** + * Function to be called after we're done processing + * replies from the local lookup. + * + * @param cls closure + * @param pr the pending request we were processing + * @param result final datastore lookup result + */ +typedef void (GSF_LocalLookupContinuation)(void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result); + + +/** + * Look up the request in the local datastore. + * + * @param pr the pending request to process + * @param cont function to call at the end + * @param cont_cls closure for cont + */ +void +GSF_local_lookup_ (struct GSF_PendingRequest *pr, + GSF_LocalLookupContinuation cont, + void *cont_cls); /** -- 2.25.1