From: Christian Grothoff Date: Thu, 22 Nov 2012 18:27:12 +0000 (+0000) Subject: mostly finishing server-side for FS-over-stream X-Git-Tag: initial-import-from-subversion-38251~10729 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=220575cb7eb46b0b7cde05733ba86d6e2e7a9aa7;p=oweals%2Fgnunet.git mostly finishing server-side for FS-over-stream --- diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 3a8a076f6..125a14118 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -115,6 +115,11 @@ struct GNUNET_TIME_Relative GSF_avg_latency = { 500 }; */ double GSF_current_priorities; +/** + * Size of the datastore queue we assume for common requests. + */ +unsigned int GSF_datastore_queue_size; + /** * How many query messages have we received 'recently' that * have not yet been claimed as cover traffic? @@ -615,7 +620,18 @@ static void run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) { + unsigned long long dqs; + GSF_cfg = cfg; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE", + &dqs)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, + "fs", "DATASTORE_QUEUE_SIZE"); + dqs = 1024; + } + GSF_datastore_queue_size = (unsigned int) dqs; GSF_enable_randomized_delays = GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); GSF_dsh = GNUNET_DATASTORE_connect (cfg); diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index d198d864d..3213712c8 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -257,6 +257,12 @@ extern struct GNUNET_BLOCK_Context *GSF_block_ctx; */ extern int GSF_enable_randomized_delays; +/** + * Size of the datastore queue we assume for common requests. + */ +extern unsigned int GSF_datastore_queue_size; + + /** * Test if the DATABASE (GET) load on this peer is too high * to even consider processing the query at diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 349232daf..9233658eb 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -195,12 +195,6 @@ static struct GNUNET_LOAD_Value *datastore_put_load; static int active_to_migration; -/** - * Size of the datastore queue we assume for common requests. - * Determined based on the network quota. - */ -static unsigned int datastore_queue_size; - /** * Heap with the request that will expire next at the top. Contains * pointers of type "struct PendingRequest*"; these will *also* be @@ -1307,7 +1301,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size, (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) ? UINT_MAX : - datastore_queue_size + GSF_datastore_queue_size /* max queue size */ , GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); @@ -1347,7 +1341,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size, (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) ? UINT_MAX : - datastore_queue_size + GSF_datastore_queue_size /* max queue size */ , GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); @@ -1405,7 +1399,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size, (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr-> public_data.options)) ? UINT_MAX : - datastore_queue_size + GSF_datastore_queue_size /* max queue size */ , GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); @@ -1487,7 +1481,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr-> public_data.options)) ? UINT_MAX : - datastore_queue_size + GSF_datastore_queue_size /* max queue size */ , GNUNET_TIME_UNIT_FOREVER_REL, &process_local_reply, pr); @@ -1639,8 +1633,6 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, void GSF_pending_request_init_ () { - unsigned long long dqs; - if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs", "MAX_PENDING_REQUESTS", @@ -1649,16 +1641,6 @@ GSF_pending_request_init_ () GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, "fs", "MAX_PENDING_REQUESTS"); } - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE", - &dqs)) - { - GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, - "fs", "DATASTORE_QUEUE_SIZE"); - dqs = 1024; - } - datastore_queue_size = (unsigned int) dqs; - active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING"); datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c index befa90dd5..f49b4246e 100644 --- a/src/fs/gnunet-service-fs_stream.c +++ b/src/fs/gnunet-service-fs_stream.c @@ -24,6 +24,7 @@ * @author Christian Grothoff * * TODO: + * - add statistics * - limit # concurrent clients, timeout for read */ #include "platform.h" @@ -33,6 +34,7 @@ #include "gnunet_protocols.h" #include "gnunet_applications.h" #include "gnunet-service-fs.h" +#include "gnunet-service-fs_indexing.h" #include "gnunet-service-fs_stream.h" /** @@ -65,6 +67,16 @@ struct StreamClient */ struct GNUNET_STREAM_IOWriteHandle *wh; + /** + * Tokenizer for requests. + */ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + + /** + * Current active request to the datastore, if we have one pending. + */ + struct GNUNET_DATASTORE_QueueEntry *qe; + /** * Size of the last write that was initiated. */ @@ -73,6 +85,56 @@ struct StreamClient }; +/** + * Query from one peer, asking the other for CHK-data. + */ +struct StreamQueryMessage +{ + + /** + * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY. + */ + struct GNUNET_MessageHeader header; + + /** + * Block type must be DBLOCK or IBLOCK. + */ + uint32_t type; + + /** + * Query hash from CHK (hash of encrypted block). + */ + struct GNUNET_HashCode query; + +}; + + +/** + * Reply to a StreamQueryMessage. + */ +struct StreamReplyMessage +{ + + /** + * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY. + */ + struct GNUNET_MessageHeader header; + + /** + * Block type must be DBLOCK or IBLOCK. + */ + uint32_t type; + + /** + * Expiration time for the block. + */ + struct GNUNET_TIME_AbsoluteNBO expiration; + + /* followed by the encrypted block */ + +}; + + /** * Listen socket for incoming requests. */ @@ -101,6 +163,9 @@ terminate_stream (struct StreamClient *sc) GNUNET_STREAM_io_read_cancel (sc->rh); if (NULL != sc->wh) GNUNET_STREAM_io_write_cancel (sc->wh); + if (NULL != sc->qe) + GNUNET_DATASTORE_cancel (sc->qe); + GNUNET_SERVER_mst_destroy (sc->mst); GNUNET_STREAM_close (sc->socket); GNUNET_CONTAINER_DLL_remove (sc_head, sc_tail, @@ -109,6 +174,48 @@ terminate_stream (struct StreamClient *sc) } +/** + * Functions of this signature are called whenever data is available from the + * stream. + * + * @param cls the closure from GNUNET_STREAM_read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +process_request (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size); + + +/** + * We're done handling a request from a client, read the next one. + * + * @param sc client to continue reading requests from + */ +static void +continue_reading (struct StreamClient *sc) +{ + int ret; + + ret = + GNUNET_SERVER_mst_receive (sc->mst, + NULL, + NULL, 0, + GNUNET_NO, GNUNET_YES); + if (GNUNET_NO == ret) + return; + sc->rh = GNUNET_STREAM_read (sc->socket, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_request, + sc); +} + + /** * Functions of this signature are called whenever data is available from the * stream. @@ -127,12 +234,25 @@ process_request (void *cls, size_t size) { struct StreamClient *sc = cls; + int ret; sc->rh = NULL; switch (status) { case GNUNET_STREAM_OK: - // fixme: handle request... + ret = + GNUNET_SERVER_mst_receive (sc->mst, + NULL, + data, size, + GNUNET_NO, GNUNET_YES); + if (GNUNET_NO == ret) + return size; /* more messages in MST */ + if (GNUNET_SYSERR == ret) + { + GNUNET_break_op (0); + terminate_stream (sc); + return size; + } break; case GNUNET_STREAM_TIMEOUT: case GNUNET_STREAM_SHUTDOWN: @@ -144,14 +264,151 @@ process_request (void *cls, GNUNET_break (0); return size; } - sc->rh = GNUNET_STREAM_read (sc->socket, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_request, - sc); + continue_reading (sc); return size; } +/** + * Sending a reply was completed, continue processing. + * + * @param cls closure with the struct StreamClient which sent the query + */ +static void +write_continuation (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) +{ + struct StreamClient *sc = cls; + + sc->wh = NULL; + if ( (GNUNET_STREAM_OK == status) && + (size == sc->reply_size) ) + continue_reading (sc); + else + terminate_stream (sc); +} + + +/** + * Process a datum that was stored in the datastore. + * + * @param cls closure with the struct StreamClient which sent the query + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +handle_datastore_reply (void *cls, + const struct GNUNET_HashCode * key, + size_t size, const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, uint64_t uid) +{ + struct StreamClient *sc = cls; + size_t msize = size + sizeof (struct StreamReplyMessage); + char buf[msize] GNUNET_ALIGN; + struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf; + + sc->qe = NULL; + if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) + { + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, + size, data, type, + priority, anonymity, + expiration, uid, + &handle_datastore_reply, + sc)) + { + continue_reading (sc); + } + return; + } + if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + continue_reading (sc); + return; + } + srm->header.size = htons ((uint16_t) msize); + srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY); + srm->type = htonl (type); + srm->expiration = GNUNET_TIME_absolute_hton (expiration); + memcpy (&srm[1], data, size); + sc->reply_size = msize; + sc->wh = GNUNET_STREAM_write (sc->socket, + buf, msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &write_continuation, + sc); + if (NULL == sc->wh) + { + terminate_stream (sc); + return; + } +} + + +/** + * Functions with this signature are called whenever a + * complete message is received. + * + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure with the 'struct StreamClient' + * @param client identification of the client, NULL + * @param message the actual message + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing + */ +static int +request_cb (void *cls, + void *client, + const struct GNUNET_MessageHeader *message) +{ + struct StreamClient *sc = cls; + const struct StreamQueryMessage *sqm; + + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY: + if (sizeof (struct StreamQueryMessage) != + ntohs (message->size)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + sqm = (const struct StreamQueryMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received query for `%s' via stream\n", + GNUNET_h2s (&sqm->query)); + sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + 0, + &sqm->query, + ntohl (sqm->type), + 0 /* FIXME: priority */, + GSF_datastore_queue_size, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_datastore_reply, sc); + if (NULL == sc->qe) + continue_reading (sc); + return GNUNET_OK; + default: + GNUNET_break_op (0); + return GNUNET_SYSERR; + } +} + + /** * Functions of this type are called upon new stream connection from other peers * or upon binding error which happen when the app_port given in @@ -175,6 +432,8 @@ accept_cb (void *cls, return GNUNET_SYSERR; sc = GNUNET_malloc (sizeof (struct StreamClient)); sc->socket = socket; + sc->mst = GNUNET_SERVER_mst_create (&request_cb, + sc); sc->rh = GNUNET_STREAM_read (sc->socket, GNUNET_TIME_UNIT_FOREVER_REL, &process_request, diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 2f84b0ade..12f8ab886 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -483,6 +483,16 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP 139 +/** + * P2P request for content (one FS to another via a stream). + */ +#define GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY 140 + +/** + * P2P answer for content (one FS to another via a stream). + */ +#define GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY 141 + /******************************************************************************* * DHT message types