From 4ef4905497e8b34e907649cbe93568ea5f46c83d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 22 Nov 2012 19:57:49 +0000 Subject: [PATCH] towards using stream for non-anonymous file-sharing --- src/fs/gnunet-service-fs_pr.c | 6 +- src/fs/gnunet-service-fs_stream.c | 577 +++++++++++++++++++++++++++++- src/fs/gnunet-service-fs_stream.h | 49 +++ 3 files changed, 625 insertions(+), 7 deletions(-) diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 9233658eb..f89abdb72 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -262,15 +262,15 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr) struct GSF_PendingRequest * GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, enum GNUNET_BLOCK_Type type, - const struct GNUNET_HashCode * query, - const struct GNUNET_HashCode * namespace, + const struct GNUNET_HashCode *query, + const struct GNUNET_HashCode *namespace, const struct GNUNET_PeerIdentity *target, const char *bf_data, size_t bf_size, uint32_t mingle, uint32_t anonymity_level, uint32_t priority, int32_t ttl, GNUNET_PEER_Id sender_pid, GNUNET_PEER_Id origin_pid, - const struct GNUNET_HashCode * replies_seen, + const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, GSF_PendingRequestReplyHandler rh, void *rh_cls) { diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c index f49b4246e..856a21a1a 100644 --- a/src/fs/gnunet-service-fs_stream.c +++ b/src/fs/gnunet-service-fs_stream.c @@ -24,8 +24,8 @@ * @author Christian Grothoff * * TODO: - * - add statistics - * - limit # concurrent clients, timeout for read + * - limit # concurrent clients, have timeouts for server-side + * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!) */ #include "platform.h" #include "gnunet_constants.h" @@ -135,6 +135,126 @@ struct StreamReplyMessage }; +/** + * Handle for a stream to another peer. + */ +struct StreamHandle; + + +/** + * Handle for a request that is going out via stream API. + */ +struct GSF_StreamRequest +{ + + /** + * DLL. + */ + struct GSF_StreamRequest *next; + + /** + * DLL. + */ + struct GSF_StreamRequest *prev; + + /** + * Which stream is this request associated with? + */ + struct StreamHandle *sh; + + /** + * Function to call with the result. + */ + GSF_StreamReplyProcessor proc; + + /** + * Closure for 'proc' + */ + void *proc_cls; + + /** + * Query to transmit to the other peer. + */ + struct GNUNET_HashCode query; + + /** + * Desired type for the reply. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we transmit this request already? YES if we are + * in the 'waiting' DLL, NO if we are in the 'pending' DLL. + */ + int was_transmitted; +}; + + +/** + * Handle for a stream to another peer. + */ +struct StreamHandle +{ + /** + * Head of DLL of pending requests on this stream. + */ + struct GSF_StreamRequest *pending_head; + + /** + * Tail of DLL of pending requests on this stream. + */ + struct GSF_StreamRequest *pending_tail; + + /** + * Head of DLL of requests waiting for a reply on this stream. + */ + struct GSF_StreamRequest *waiting_head; + + /** + * Tail of DLL of requests waiting for a reply on this stream. + */ + struct GSF_StreamRequest *waiting_tail; + + /** + * Connection to the other peer. + */ + struct GNUNET_STREAM_Socket *stream; + + /** + * Handle for active read operation, or NULL. + */ + struct GNUNET_STREAM_IOReadHandle *rh; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_STREAM_IOWriteHandle *wh; + + /** + * Tokenizer for replies. + */ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + + /** + * Which peer does this stream go to? + */ + struct GNUNET_PeerIdentity target; + + /** + * Task to kill inactive streams (we keep them around for + * a few seconds to give the application a chance to give + * us another query). + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Is this stream ready for transmission? + */ + int is_ready; + +}; + + /** * Listen socket for incoming requests. */ @@ -150,6 +270,415 @@ static struct StreamClient *sc_head; */ static struct StreamClient *sc_tail; +/** + * Map from peer identities to 'struct StreamHandles' with streams to + * those peers. + */ +static struct GNUNET_CONTAINER_MultiHashMap *stream_map; + + +/* ********************* client-side code ************************* */ + + +/** + * Destroy a stream handle. + * + * @param sh stream to process + */ +static void +destroy_stream_handle (struct StreamHandle *sh) +{ + struct GSF_StreamRequest *sr; + + while (NULL != (sr = sh->pending_head)) + { + sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, + GNUNET_TIME_UNIT_FOREVER_ABS, + 0, NULL); + GSF_stream_query_cancel (sr); + } + while (NULL != (sr = sh->waiting_head)) + { + sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, + GNUNET_TIME_UNIT_FOREVER_ABS, + 0, NULL); + GSF_stream_query_cancel (sr); + } + if (NULL != sh->wh) + GNUNET_STREAM_io_write_cancel (sh->wh); + if (NULL != sh->rh) + GNUNET_STREAM_io_read_cancel (sh->rh); + if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) + GNUNET_SCHEDULER_cancel (sh->timeout_task); + GNUNET_STREAM_close (sh->stream); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (stream_map, + &sh->target.hashPubKey, + sh)); + GNUNET_free (sh); +} + + +/** + * Transmit pending requests via the stream. + * + * @param sh stream to process + */ +static void +transmit_pending (struct StreamHandle *sh); + + +/** + * Function called once the stream is ready for transmission. + * + * @param cls the 'struct StreamHandle' + * @param socket stream socket handle + */ +static void +stream_ready_cb (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + struct StreamHandle *sh = cls; + + sh->is_ready = GNUNET_YES; + transmit_pending (sh); +} + + +/** + * We had a serious error, tear down and re-create stream from scratch. + * + * @param sh stream to reset + */ +static void +reset_stream (struct StreamHandle *sh) +{ + struct GSF_StreamRequest *sr; + + if (NULL != sh->rh) + GNUNET_STREAM_io_read_cancel (sh->rh); + GNUNET_STREAM_close (sh->stream); + sh->is_ready = GNUNET_NO; + while (NULL != (sr = sh->waiting_tail)) + { + GNUNET_CONTAINER_DLL_remove (sh->waiting_head, + sh->waiting_tail, + sr); + GNUNET_CONTAINER_DLL_insert (sh->pending_head, + sh->pending_tail, + sr); + sr->was_transmitted = GNUNET_NO; + } + sh->stream = GNUNET_STREAM_open (GSF_cfg, + &sh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + &stream_ready_cb, sh, + GNUNET_STREAM_OPTION_END); +} + + +/** + * We got a reply from the stream. Process it. + * + * @param cls the struct StreamHandle + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +handle_stream_reply (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) +{ + struct StreamHandle *sh = cls; + + sh->rh = NULL; + if (GNUNET_SYSERR == + GNUNET_SERVER_mst_receive (sh->mst, + NULL, + data, size, + GNUNET_NO, GNUNET_NO)) + { + GNUNET_break_op (0); + reset_stream (sh); + return size; + } + sh->rh = GNUNET_STREAM_read (sh->stream, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_stream_reply, + sh); + return size; +} + + +/** + * Functions of this signature are called whenever we transmitted a + * query via a stream. + * + * @param cls the struct StreamHandle for which we did the write call + * @param status the status of the stream at the time this function is called; + * GNUNET_OK if writing to stream was completed successfully, + * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the + * mean time. + * @param size the number of bytes written + */ +static void +query_write_continuation (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) +{ + struct StreamHandle *sh = cls; + + sh->wh = NULL; + if ( (GNUNET_STREAM_OK != status) || + (sizeof (struct StreamQueryMessage) != size) ) + { + reset_stream (sh); + return; + } + if (NULL == sh->rh) + sh->rh = GNUNET_STREAM_read (sh->stream, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_stream_reply, + sh); + transmit_pending (sh); +} + + +/** + * Transmit pending requests via the stream. + * + * @param sh stream to process + */ +static void +transmit_pending (struct StreamHandle *sh) +{ + struct StreamQueryMessage sqm; + struct GSF_StreamRequest *sr; + + if (NULL != sh->wh) + return; + sr = sh->pending_head; + if (NULL == sr) + return; + GNUNET_CONTAINER_DLL_remove (sh->pending_head, + sh->pending_tail, + sr); + GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head, + sh->waiting_tail, + sr); + sr->was_transmitted = GNUNET_YES; + sqm.header.size = htons (sizeof (sqm)); + sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY); + sqm.type = htonl (sr->type); + sqm.query = sr->query; + sh->wh = GNUNET_STREAM_write (sh->stream, + &sqm, sizeof (sqm), + GNUNET_TIME_UNIT_FOREVER_REL, + &query_write_continuation, + sh); +} + + +/** + * Functions with this signature are called whenever a + * complete reply is received. + * + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure with the 'struct StreamHandle' + * @param client identification of the client, NULL + * @param message the actual message + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing + */ +static int +reply_cb (void *cls, + void *client, + const struct GNUNET_MessageHeader *message) +{ + struct StreamHandle *sh = cls; + const struct StreamReplyMessage *srm; + uint16_t msize; + enum GNUNET_BLOCK_Type type; + struct GNUNET_HashCode query; + struct GSF_StreamRequest *sr; + + msize = ntohs (message->size); + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY: + if (sizeof (struct StreamReplyMessage) > msize) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + srm = (const struct StreamReplyMessage *) message; + msize -= sizeof (struct StreamReplyMessage); + type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); + if (GNUNET_YES != + GNUNET_BLOCK_get_key (GSF_block_ctx, + type, + &srm[1], msize, &query)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received reply `%s' via stream\n", + GNUNET_h2s (&query)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via stream"), 1, + GNUNET_NO); + for (sr = sh->waiting_head; NULL != sr; sr = sr->next) + if (0 == memcmp (&query, + &sr->query, + sizeof (struct GNUNET_HashCode))) + break; + if (NULL == sr) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via stream dropped"), 1, + GNUNET_NO); + return GNUNET_OK; + } + sr->proc (sr->proc_cls, + type, + GNUNET_TIME_absolute_ntoh (srm->expiration), + msize, + &srm[1]); + GSF_stream_query_cancel (sr); + return GNUNET_OK; + default: + GNUNET_break_op (0); + return GNUNET_SYSERR; + } +} + + +/** + * Get (or create) a stream to talk to the given peer. + * + * @param target peer we want to communicate with + */ +static struct StreamHandle * +get_stream (const struct GNUNET_PeerIdentity *target) +{ + struct StreamHandle *sh; + + sh = GNUNET_CONTAINER_multihashmap_get (stream_map, + &target->hashPubKey); + if (NULL != sh) + { + if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) + { + GNUNET_SCHEDULER_cancel (sh->timeout_task); + sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + return sh; + } + sh = GNUNET_malloc (sizeof (struct StreamHandle)); + sh->mst = GNUNET_SERVER_mst_create (&reply_cb, + sh); + sh->target = *target; + sh->stream = GNUNET_STREAM_open (GSF_cfg, + &sh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + &stream_ready_cb, sh, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (stream_map, + &sh->target.hashPubKey, + sh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return sh; +} + + +/** + * Look for a block by directly contacting a particular peer. + * + * @param target peer that should have the block + * @param query hash to query for the block + * @param type desired type for the block + * @param proc function to call with result + * @param proc_cls closure for 'proc' + * @return handle to cancel the operation + */ +struct GSF_StreamRequest * +GSF_stream_query (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_StreamReplyProcessor proc, void *proc_cls) +{ + struct StreamHandle *sh; + struct GSF_StreamRequest *sr; + + sh = get_stream (target); + sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest)); + sr->sh = sh; + sr->proc = proc; + sr->proc_cls = proc_cls; + sr->type = type; + sr->query = *query; + GNUNET_CONTAINER_DLL_insert (sh->pending_head, + sh->pending_tail, + sr); + if (GNUNET_YES == sh->is_ready) + transmit_pending (sh); + return sr; +} + + +/** + * Task called when it is time to destroy an inactive stream. + * + * @param cls the 'struct StreamHandle' to tear down + * @param tc scheduler context, unused + */ +static void +stream_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamHandle *sh = cls; + + sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + destroy_stream_handle (sh); +} + + +/** + * Cancel an active request; must not be called after 'proc' + * was calld. + * + * @param sr request to cancel + */ +void +GSF_stream_query_cancel (struct GSF_StreamRequest *sr) +{ + struct StreamHandle *sh = sr->sh; + + if (GNUNET_YES == sr->was_transmitted) + GNUNET_CONTAINER_DLL_remove (sh->waiting_head, + sh->waiting_tail, + sr); + else + GNUNET_CONTAINER_DLL_remove (sh->pending_head, + sh->pending_tail, + sr); + GNUNET_free (sr); + if ( (NULL == sh->waiting_head) && + (NULL == sh->pending_head) ) + sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &stream_timeout, + sh); +} + + +/* ********************* server-side code ************************* */ + /** * We're done with a particular client, clean up. @@ -159,6 +688,9 @@ static struct StreamClient *sc_tail; static void terminate_stream (struct StreamClient *sc) { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# stream connections active"), -1, + GNUNET_NO); if (NULL != sc->rh) GNUNET_STREAM_io_read_cancel (sc->rh); if (NULL != sc->wh) @@ -284,7 +816,12 @@ write_continuation (void *cls, sc->wh = NULL; if ( (GNUNET_STREAM_OK == status) && (size == sc->reply_size) ) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Blocks transferred via stream"), 1, + GNUNET_NO); continue_reading (sc); + } else terminate_stream (sc); } @@ -361,7 +898,7 @@ handle_datastore_reply (void *cls, /** * Functions with this signature are called whenever a - * complete message is received. + * complete query message is received. * * Do not call GNUNET_SERVER_mst_destroy in callback * @@ -391,11 +928,14 @@ request_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received query for `%s' via stream\n", GNUNET_h2s (&sqm->query)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# queries received via stream"), 1, + GNUNET_NO); sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 0, &sqm->query, ntohl (sqm->type), - 0 /* FIXME: priority */, + 0 /* priority */, GSF_datastore_queue_size, GNUNET_TIME_UNIT_FOREVER_REL, &handle_datastore_reply, sc); @@ -430,6 +970,9 @@ accept_cb (void *cls, if (NULL == socket) return GNUNET_SYSERR; + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# stream connections active"), 1, + GNUNET_NO); sc = GNUNET_malloc (sizeof (struct StreamClient)); sc->socket = socket; sc->mst = GNUNET_SERVER_mst_create (&request_cb, @@ -451,6 +994,7 @@ accept_cb (void *cls, void GSF_stream_start () { + stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); listen_socket = GNUNET_STREAM_listen (GSF_cfg, GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, &accept_cb, NULL, @@ -458,6 +1002,26 @@ GSF_stream_start () } +/** + * Function called on each active streams to shut them down. + * + * @param cls NULL + * @param key target peer, unused + * @param value the 'struct StreamHandle' to destroy + * @return GNUNET_YES (continue to iterate) + */ +static int +release_streams (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct StreamHandle *sh = value; + + destroy_stream_handle (sh); + return GNUNET_YES; +} + + /** * Shutdown subsystem for non-anonymous file-sharing. */ @@ -473,6 +1037,11 @@ GSF_stream_stop () GNUNET_STREAM_listen_close (listen_socket); listen_socket = NULL; } + GNUNET_CONTAINER_multihashmap_iterate (stream_map, + &release_streams, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (stream_map); + stream_map = NULL; } /* end of gnunet-service-fs_stream.c */ diff --git a/src/fs/gnunet-service-fs_stream.h b/src/fs/gnunet-service-fs_stream.h index daa617290..982766763 100644 --- a/src/fs/gnunet-service-fs_stream.h +++ b/src/fs/gnunet-service-fs_stream.h @@ -26,6 +26,55 @@ #ifndef GNUNET_SERVICE_FS_STREAM_H #define GNUNET_SERVICE_FS_STREAM_H +/** + * Handle for a request that is going out via stream API. + */ +struct GSF_StreamRequest; + + +/** + * Function called with a reply from the stream. + * + * @param cls closure + * @param type type of the block, ANY on error + * @param expiration expiration time for the block + * @param data_size number of bytes in 'data', 0 on error + * @param data reply block data, NULL on error + */ +typedef void (*GSF_StreamReplyProcessor)(void *cls, + enum GNUNET_BLOCK_Type type, + struct GNUNET_TIME_Absolute expiration, + size_t data_size, + const void *data); + + +/** + * Look for a block by directly contacting a particular peer. + * + * @param target peer that should have the block + * @param query hash to query for the block + * @param type desired type for the block + * @param proc function to call with result + * @param proc_cls closure for 'proc' + * @return handle to cancel the operation + */ +struct GSF_StreamRequest * +GSF_stream_query (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_StreamReplyProcessor proc, void *proc_cls); + + +/** + * Cancel an active request; must not be called after 'proc' + * was calld. + * + * @param sr request to cancel + */ +void +GSF_stream_query_cancel (struct GSF_StreamRequest *sr); + + /** * Initialize subsystem for non-anonymous file-sharing. */ -- 2.25.1