From 92b434b649de5bc75628eff46f17db18104a4dd8 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 11 Sep 2009 14:22:19 +0000 Subject: [PATCH] more FS coding --- src/fs/fs_uri.c | 4 +- src/fs/gnunet-service-fs.c | 490 +++++++++++++++++++++++++++++++++++-- src/upnp/upnp_util.h | 2 +- 3 files changed, 469 insertions(+), 27 deletions(-) diff --git a/src/fs/fs_uri.c b/src/fs/fs_uri.c index 4f4a82c2c..1c0686ed9 100644 --- a/src/fs/fs_uri.c +++ b/src/fs/fs_uri.c @@ -1481,7 +1481,9 @@ gather_uri_data (void *cls, * Construct a keyword-URI from meta-data (take all entries * in the meta-data and construct one large keyword URI * that lists all keywords that can be found in the meta-data). - * @deprecated + * + * @param md metadata to use + * @return NULL on error, otherwise a KSK URI */ struct GNUNET_FS_Uri * GNUNET_FS_uri_ksk_create_from_meta_data (const struct GNUNET_CONTAINER_MetaData *md) diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index bf64f73f1..b85a63d4b 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -22,6 +22,13 @@ * @file fs/gnunet-service-fs.c * @brief program that provides the file-sharing service * @author Christian Grothoff + * + * TODO: + * - INDEX_START handling + * - INDEX_LIST handling + * - UNINDEX handling + * - bloomfilter support (GET, CS-request with BF, etc.) + * - all P2P messages */ #include "platform.h" #include "gnunet_protocols.h" @@ -30,8 +37,21 @@ #include "gnunet_util_lib.h" #include "fs.h" +/** + * Our connection to the datastore. + */ static struct GNUNET_DATASTORE_Handle *dsh; +/** + * Our scheduler. + */ +static struct GNUNET_SCHEDULER_Handle *sched; + +/** + * Our configuration. + */ +const struct GNUNET_CONFIGURATION_Handle *cfg; + /** * Handle INDEX_START-message. * @@ -141,7 +161,8 @@ handle_unindex (void *cls, /** - * FIXME + * Signature of a function that is called whenever a datastore + * request can be processed (or an entry put on the queue times out). * * @param cls closure * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout @@ -150,6 +171,56 @@ typedef void (*RequestFunction)(void *cls, int ok); +/** + * Doubly-linked list of our requests for the datastore. + */ +struct DatastoreRequestQueue +{ + + /** + * This is a doubly-linked list. + */ + struct DatastoreRequestQueue *next; + + /** + * This is a doubly-linked list. + */ + struct DatastoreRequestQueue *prev; + + /** + * Function to call (will issue the request). + */ + RequestFunction req; + + /** + * Closure for req. + */ + void *req_cls; + + /** + * When should this request time-out because we don't care anymore? + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * ID of task used for signaling timeout. + */ + GNUNET_SCHEDULER_TaskIdentifier task; + +}; + + +/** + * Head of request queue for the datastore, sorted by timeout. + */ +static struct DatastoreRequestQueue *drq_head; + +/** + * Tail of request queue for the datastore. + */ +static struct DatastoreRequestQueue *drq_tail; + + /** * Run the next DS request in our * queue, we're done with the current one. @@ -157,59 +228,397 @@ typedef void (*RequestFunction)(void *cls, static void next_ds_request () { + struct DatastoreRequestQueue *e; + + while (NULL != (e = drq_head)) + { + if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) + break; + if (e->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, e->task); + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); + e->req (e->req_cls, GNUNET_NO); + GNUNET_free (e); + } + if (e == NULL) + return; + if (e->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, e->task); + e->task = GNUNET_SCHEDULER_NO_TASK; + e->req (e->req_cls, GNUNET_YES); + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); + GNUNET_free (e); } /** - * FIXME. + * A datastore request had to be timed out. + * + * @param cls closure (of type "struct DatastoreRequestQueue*") + * @param tc task context, unused */ static void +timeout_ds_request (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct DatastoreRequestQueue *e = cls; + + e->task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); + e->req (e->req_cls, GNUNET_NO); + GNUNET_free (e); +} + + +/** + * Queue a request for the datastore. + * + * @param deadline by when the request should run + * @param fun function to call once the request can be run + * @param fun_cls closure for fun + */ +static struct DatastoreRequestQueue * queue_ds_request (struct GNUNET_TIME_Relative deadline, RequestFunction fun, void *fun_cls) { -} + struct DatastoreRequestQueue *e; + struct DatastoreRequestQueue *bef; + if (drq_head == NULL) + { + /* no other requests pending, run immediately */ + fun (fun_cls, GNUNET_OK); + return NULL; + } + e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue)); + e->timeout = GNUNET_TIME_relative_to_absolute (deadline); + e->req = fun; + e->req_cls = fun_cls; + if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + { + /* local request, highest prio, put at head of queue + regardless of deadline */ + bef = NULL; + } + else + { + bef = drq_tail; + while ( (NULL != bef) && + (e->timeout.value < bef->timeout.value) ) + bef = bef->prev; + } + GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); + if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) + return e; + e->task = GNUNET_SCHEDULER_add_delayed (sched, + GNUNET_NO, + GNUNET_SCHEDULER_PRIORITY_BACKGROUND, + GNUNET_SCHEDULER_NO_TASK, + deadline, + &timeout_ds_request, + e); + return e; +} /** - * Closure for processing START_SEARCH - * messages from a client. + * Closure for processing START_SEARCH messages from a client. */ struct LocalGetContext { + + /** + * This is a doubly-linked list. + */ + struct LocalGetContext *next; + + /** + * This is a doubly-linked list. + */ + struct LocalGetContext *prev; + /** * Client that initiated the search. */ struct GNUNET_SERVER_Client *client; + /** + * Array of results that we've already received + * (can be NULL). + */ + GNUNET_HashCode *results; + /** + * Bloomfilter over all results (for fast query construction); + * NULL if we don't have any results. + */ + struct GNUNET_CONTAINER_BloomFilter *results_bf; + + /** + * DS request associated with this operation. + */ + struct DatastoreRequestQueue *req; + + /** + * Current result message to transmit to client (or NULL). + */ + struct ContentMessage *result; + + /** + * Type of the content that we're looking for. + * 0 for any. + */ + uint32_t type; + + /** + * Desired anonymity level. + */ + uint32_t anonymity_level; + + /** + * Number of results actually stored in the results array. + */ + unsigned int results_used; + /** + * Size of the results array in memory. + */ + unsigned int results_size; + + /** + * If the request is for a DBLOCK or IBLOCK, this is the identity of + * the peer that is known to have a response. Set to all-zeros if + * such a target is not known (note that even if OUR anonymity + * level is >0 we may happen to know the responder's identity; + * nevertheless, we should probably not use it for a DHT-lookup + * or similar blunt actions in order to avoid exposing ourselves). + *

+ * If the request is for an SBLOCK, this is the identity of the + * pseudonym to which the SBLOCK belongs. + *

+ * If the request is for a KBLOCK, "target" must be all zeros. + */ + GNUNET_HashCode target; + + /** + * Hash of the keyword (aka query) for KBLOCKs; Hash of + * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query) + * and hash of the identifier XORed with the target for + * SBLOCKS (aka query). + */ + GNUNET_HashCode query; + }; -static void +/** + * Head of doubly-linked LGC list. + */ +static struct LocalGetContext *lgc_head; + +/** + * Tail of doubly-linked LGC list. + */ +static struct LocalGetContext *lgc_tail; + + +/** + * Free the state associated with a local get context. + * + * @param lgc the lgc to free + */ +static void +local_get_context_free (struct LocalGetContext *lgc) +{ + GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); + GNUNET_SERVER_client_drop (lgc->client); + GNUNET_free_non_null (lgc->results); + if (lgc->results_bf != NULL) + GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf); + if (lgc->req != NULL) + { + if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK) + GNUNET_SCHEDULER_cancel (sched, lgc->req->task); + GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); + GNUNET_free (lgc->req); + } + GNUNET_free (lgc); +} + + +/** + * We're able to transmit the next (local) result to the client. + * Do it and ask the datastore for more. Or, on error, tell + * the datastore to stop giving us more. + * + * @param cls our closure (struct LocalGetContext) + * @param max maximum number of bytes we can transmit + * @param buf where to copy our message + * @return number of bytes copied to buf + */ +static size_t +transmit_local_result (void *cls, + size_t max, + void *buf) +{ + struct LocalGetContext *lgc = cls; + uint16_t msize; + + if (NULL == buf) + { + /* error, abort! */ + GNUNET_free (lgc->result); + lgc->result = NULL; + GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); + return 0; + } + msize = ntohs (lgc->result->header.size); + GNUNET_assert (max >= msize); + memcpy (buf, lgc->result, msize); + GNUNET_free (lgc->result); + lgc->result = NULL; + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + return msize; +} + + +/** + * We're processing (local) results for a search request + * from a (local) client. Pass applicable results to the + * client and if we are done either clean up (operation + * complete) or switch to P2P search (more results possible). + * + * @param cls our closure (struct LocalGetContext) + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +process_local_get_result (void *cls, + const GNUNET_HashCode * key, + uint32_t size, + const void *data, + uint32_t type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, + uint64_t uid) +{ + struct LocalGetContext *lgc = cls; + size_t msize; + + if (key == NULL) + { + /* no further results from datastore; continue + processing further requests from the client and + allow the next task to use the datastore; also, + switch to P2P requests or clean up our state. */ + next_ds_request (); + GNUNET_SERVER_receive_done (lgc->client, + GNUNET_OK); + if ( (lgc->results_used == 0) || + (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || + (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || + (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) + { + // FIXME: initiate P2P search + return; + } + /* got all possible results, clean up! */ + local_get_context_free (lgc); + return; + } + if (lgc->results_used == lgc->results_size) + { + GNUNET_array_grow (lgc->results, + lgc->results_size, + lgc->results_size * 2 + 2); + if ( (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) || + (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_IBLOCK) ) + { + // FIXME: possibly grow/create BF! + } + } + GNUNET_CRYPTO_hash (data, + size, + &lgc->results[lgc->results_used++]); + if ( (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) || + (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_IBLOCK) ) + { + // FIXME: add result to BF! + } + msize = size + sizeof (struct ContentMessage); + GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); + lgc->result = GNUNET_malloc (msize); + lgc->result->header.size = htons (msize); + lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT); + lgc->result->type = htonl (type); + lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration); + memcpy (&lgc->result[1], + data, + size); + GNUNET_SERVER_notify_transmit_ready (lgc->client, + msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_local_result, + lgc); +} + + +/** + * We're processing a search request from a local + * client. Now it is our turn to query the datastore. + * + * @param cls our closure (struct LocalGetContext) + * @param tc unused + */ +static void transmit_local_get (void *cls, - int ok) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct LocalGetContext *lgc = cls; - // FIXME: search locally - GNUNET_assert (GNUNET_OK == ok); - GNUNET_SERVER_receive_done (lgc->client, - GNUNET_OK); + GNUNET_DATASTORE_get (dsh, + &lgc->query, + lgc->type, + &process_local_get_result, + lgc, + GNUNET_TIME_UNIT_FOREVER_REL); +} - // once we're done processing the DS reply, do: - next_ds_request (); - // FIXME: if not found, initiate P2P search - // FIXME: once done with "client" handle: - GNUNET_SERVER_client_drop (lgc->client); +/** + * We're processing a search request from a local + * client. Now it is our turn to query the datastore. + * + * @param cls our closure (struct LocalGetContext) + * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK + */ +static void +transmit_local_get_ready (void *cls, + int ok) +{ + struct LocalGetContext *lgc = cls; + + GNUNET_assert (GNUNET_OK == ok); + GNUNET_SCHEDULER_add_continuation (sched, + GNUNET_NO, + &transmit_local_get, + lgc, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); } /** - * Handle START_SEARCH-message. + * Handle START_SEARCH-message (search request from client). * * @param cls closure * @param client identification of the client @@ -227,10 +636,14 @@ handle_start_search (void *cls, GNUNET_SERVER_client_keep (client); lgc = GNUNET_malloc (sizeof (struct LocalGetContext)); lgc->client = client; - // lgc->x = y; - queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_local_get, - lgc); + lgc->type = ntohl (sm->type); + lgc->anonymity_level = ntohl (sm->anonymity_level); + lgc->target = sm->target; + lgc->query = sm->query; + GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc); + lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_local_get_ready, + lgc); } @@ -251,6 +664,29 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { }; +/** + * A client disconnected. Remove all of its pending queries. + * + * @param cls closure, NULL + * @param client identification of the client + */ +static void +handle_client_disconnect (void *cls, + struct GNUNET_SERVER_Client + * client) +{ + struct LocalGetContext *lgc; + + lgc = lgc_head; + while ( (NULL != lgc) && + (lgc->client != client) ) + lgc = lgc->next; + if (lgc == NULL) + return; /* not one of our clients */ + local_get_context_free (lgc); +} + + /** * Task run during shutdown. * @@ -277,10 +713,12 @@ shutdown_task (void *cls, */ static void run (void *cls, - struct GNUNET_SCHEDULER_Handle *sched, + struct GNUNET_SCHEDULER_Handle *s, struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *cfg) + const struct GNUNET_CONFIGURATION_Handle *c) { + sched = s; + cfg = c; dsh = GNUNET_DATASTORE_connect (cfg, sched); if (NULL == dsh) @@ -289,9 +727,11 @@ run (void *cls, _("Failed to connect to datastore service.\n")); return; } + GNUNET_SERVER_disconnect_notify (server, + &handle_client_disconnect, + NULL); GNUNET_SERVER_add_handlers (server, handlers); - // FIXME: also handle P2P messages! - + // FIXME: also register with core to handle P2P messages! GNUNET_SCHEDULER_add_delayed (sched, GNUNET_YES, GNUNET_SCHEDULER_PRIORITY_IDLE, diff --git a/src/upnp/upnp_util.h b/src/upnp/upnp_util.h index 32c790493..1c7555b09 100644 --- a/src/upnp/upnp_util.h +++ b/src/upnp/upnp_util.h @@ -1,5 +1,5 @@ /** - * @file transport/upnp_util.h Utility Functions + * @file upnp/upnp_util.h Utility Functions * @ingroup core * * gaim -- 2.25.1