From: Christian Grothoff Date: Tue, 4 Jun 2013 13:10:47 +0000 (+0000) Subject: -towards using mesh instead of stream X-Git-Tag: initial-import-from-subversion-38251~8844 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=25b2a459ae5be496f8e4275777812c57f93ea29c;p=oweals%2Fgnunet.git -towards using mesh instead of stream --- diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index a75e5d4c4..842da5d47 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -193,7 +193,7 @@ gnunet_service_fs_LDADD = \ $(top_builddir)/src/block/libgnunetblock.la \ $(top_builddir)/src/datastore/libgnunetdatastore.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ - $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/mesh/libgnunetmesh.la \ $(top_builddir)/src/ats/libgnunetats.la \ $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/util/libgnunetutil.la \ diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c index 3838b1c24..31a202d50 100644 --- a/src/fs/gnunet-service-fs_stream.c +++ b/src/fs/gnunet-service-fs_stream.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) + (C) 2012, 2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -22,11 +22,26 @@ * @file fs/gnunet-service-fs_stream.c * @brief non-anonymous file-transfer * @author Christian Grothoff + * + * TODO: + * - update comments on functions (still matches 'stream') + * - MESH2 API doesn't allow flow control for server yet (needed!) + * - likely need to register clean up handler with mesh to handle + * client disconnect (likely leaky right now) + * - server is optional, currently client code will NPE if we have + * no server, again MESH2 API requirement forcing this for now + * - message handlers are symmetric for client/server, should be + * separated (currently clients can get requests and servers can + * handle answers, not good) + * - code is entirely untested + * - might have overlooked a few possible simplifications + * - PORT is set to old application type, unsure if we should keep + * it that way (fine for now) */ #include "platform.h" #include "gnunet_constants.h" #include "gnunet_util_lib.h" -#include "gnunet_stream_lib.h" +#include "gnunet_mesh2_service.h" #include "gnunet_protocols.h" #include "gnunet_applications.h" #include "gnunet-service-fs.h" @@ -84,17 +99,12 @@ struct StreamClient /** * Socket for communication. */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Handle for active read operation, or NULL. - */ - struct GNUNET_STREAM_ReadHandle *rh; + struct GNUNET_MESH_Tunnel *socket; /** * Handle for active write operation, or NULL. */ - struct GNUNET_STREAM_WriteHandle *wh; + struct GNUNET_MESH_TransmitHandle *wh; /** * Head of write queue. @@ -106,11 +116,6 @@ struct StreamClient */ struct WriteQueueItem *wqi_tail; - /** - * Tokenizer for requests. - */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - /** * Current active request to the datastore, if we have one pending. */ @@ -263,22 +268,12 @@ struct StreamHandle /** * Connection to the other peer. */ - struct GNUNET_STREAM_Socket *stream; - - /** - * Handle for active read operation, or NULL. - */ - struct GNUNET_STREAM_ReadHandle *rh; + struct GNUNET_MESH_Tunnel *stream; /** * Handle for active write operation, or NULL. */ - struct GNUNET_STREAM_WriteHandle *wh; - - /** - * Tokenizer for replies. - */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct GNUNET_MESH_TransmitHandle *wh; /** * Which peer does this stream go to? @@ -310,7 +305,7 @@ struct StreamHandle /** * Listen socket for incoming requests. */ -static struct GNUNET_STREAM_ListenSocket *listen_socket; +static struct GNUNET_MESH_Handle *listen_socket; /** * Head of DLL of stream clients. @@ -387,14 +382,12 @@ destroy_stream_handle (struct StreamHandle *sh) &free_waiting_entry, sh); if (NULL != sh->wh) - GNUNET_STREAM_write_cancel (sh->wh); - if (NULL != sh->rh) - GNUNET_STREAM_read_cancel (sh->rh); + GNUNET_MESH_notify_transmit_ready_cancel (sh->wh); if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) GNUNET_SCHEDULER_cancel (sh->timeout_task); if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task) GNUNET_SCHEDULER_cancel (sh->reset_task); - GNUNET_STREAM_close (sh->stream); + GNUNET_MESH_tunnel_destroy (sh->stream); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_remove (stream_map, &sh->target.hashPubKey, @@ -413,23 +406,6 @@ static void transmit_pending (struct StreamHandle *sh); -/** - * Function called once the stream is ready for transmission. - * - * @param cls the 'struct StreamHandle' - * @param socket stream socket handle - */ -static void -stream_ready_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) -{ - struct StreamHandle *sh = cls; - - sh->is_ready = GNUNET_YES; - transmit_pending (sh); -} - - /** * Iterator called on each entry in a waiting map to * move it back to the pending list. @@ -470,21 +446,15 @@ reset_stream (struct StreamHandle *sh) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Resetting stream to %s\n", GNUNET_i2s (&sh->target)); - if (NULL != sh->rh) - { - GNUNET_STREAM_read_cancel (sh->rh); - sh->rh = NULL; - } - GNUNET_STREAM_close (sh->stream); + GNUNET_MESH_tunnel_destroy (sh->stream); sh->is_ready = GNUNET_NO; GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map, &move_to_pending, sh); - sh->stream = GNUNET_STREAM_open (GSF_cfg, - &sh->target, - GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, - &stream_ready_cb, sh, - GNUNET_STREAM_OPTION_END); + sh->stream = GNUNET_MESH_tunnel_create (listen_socket, + sh, + &sh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER); } @@ -542,105 +512,33 @@ reset_stream_async (struct StreamHandle *sh) /** - * We got a reply from the stream. Process it. - * - * @param cls the struct StreamHandle - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). - */ -static size_t -handle_stream_reply (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) -{ - struct StreamHandle *sh = cls; - - sh->rh = NULL; - GNUNET_SCHEDULER_cancel (sh->reset_task); - sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, - &reset_stream_task, - sh); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received %u bytes from stream to %s\n", - (unsigned int) size, - GNUNET_i2s (&sh->target)); - if (GNUNET_SYSERR == - GNUNET_SERVER_mst_receive (sh->mst, - NULL, - data, size, - GNUNET_NO, GNUNET_NO)) - { - GNUNET_break_op (0); - reset_stream_async (sh); - return size; - } - if (NULL == sh->rh) - sh->rh = GNUNET_STREAM_read (sh->stream, - GNUNET_TIME_UNIT_FOREVER_REL, - &handle_stream_reply, - sh); - return size; -} - - -/** - * Functions of this signature are called whenever we transmitted a + * Functions of this signature are called whenever we are ready to transmit * query via a stream. * * @param cls the struct StreamHandle for which we did the write call - * @param status the status of the stream at the time this function is called; - * GNUNET_OK if writing to stream was completed successfully, - * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the - * mean time. - * @param size the number of bytes written + * @param size the number of bytes that can be written to 'buf' + * @param buf where to write the message + * @return number of bytes written to 'buf' */ -static void -query_write_continuation (void *cls, - enum GNUNET_STREAM_Status status, - size_t size) +static size_t +transmit_sqm (void *cls, + size_t size, + void *buf) { struct StreamHandle *sh = cls; + struct StreamQueryMessage sqm; + struct GSF_StreamRequest *sr; sh->wh = NULL; - if ( (GNUNET_STREAM_OK != status) || - (sizeof (struct StreamQueryMessage) != size) ) + if (NULL == buf) { reset_stream (sh); - return; + return 0; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Successfully transmitted %u bytes via stream to %s\n", - (unsigned int) size, - GNUNET_i2s (&sh->target)); - if (NULL == sh->rh) - sh->rh = GNUNET_STREAM_read (sh->stream, - GNUNET_TIME_UNIT_FOREVER_REL, - &handle_stream_reply, - sh); - transmit_pending (sh); -} - - -/** - * Transmit pending requests via the stream. - * - * @param sh stream to process - */ -static void -transmit_pending (struct StreamHandle *sh) -{ - struct StreamQueryMessage sqm; - struct GSF_StreamRequest *sr; - - if (NULL != sh->wh) - return; sr = sh->pending_head; if (NULL == sr) - return; + return 0; + GNUNET_assert (size >= sizeof (struct StreamQueryMessage)); GNUNET_CONTAINER_DLL_remove (sh->pending_head, sh->pending_tail, sr); @@ -656,11 +554,30 @@ transmit_pending (struct StreamHandle *sh) sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY); sqm.type = htonl (sr->type); sqm.query = sr->query; - sh->wh = GNUNET_STREAM_write (sh->stream, - &sqm, sizeof (sqm), - GNUNET_TIME_UNIT_FOREVER_REL, - &query_write_continuation, - sh); + memcpy (buf, &sqm, sizeof (sqm)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Successfully transmitted %u bytes via mesh to %s\n", + (unsigned int) size, + GNUNET_i2s (&sh->target)); + transmit_pending (sh); + return sizeof (sqm); +} + + +/** + * Transmit pending requests via the stream. + * + * @param sh stream to process + */ +static void +transmit_pending (struct StreamHandle *sh) +{ + if (NULL != sh->wh) + return; + sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow cork */, + GNUNET_TIME_UNIT_FOREVER_REL, + sizeof (struct StreamQueryMessage), + &transmit_sqm, sh); } @@ -729,8 +646,6 @@ handle_reply (void *cls, * Functions with this signature are called whenever a * complete reply is received. * - * Do not call GNUNET_SERVER_mst_destroy in callback - * * @param cls closure with the 'struct StreamHandle' * @param client identification of the client, NULL * @param message the actual message @@ -738,10 +653,12 @@ handle_reply (void *cls, */ static int reply_cb (void *cls, - void *client, + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *message) { - struct StreamHandle *sh = cls; + struct StreamHandle *sh = *tunnel_ctx; const struct StreamReplyMessage *srm; struct HandleReplyClosure hrc; uint16_t msize; @@ -749,55 +666,47 @@ reply_cb (void *cls, struct GNUNET_HashCode query; msize = ntohs (message->size); - switch (ntohs (message->type)) + if (sizeof (struct StreamReplyMessage) > msize) { - case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY: - if (sizeof (struct StreamReplyMessage) > msize) - { - GNUNET_break_op (0); - reset_stream_async (sh); - return GNUNET_SYSERR; - } - srm = (const struct StreamReplyMessage *) message; - msize -= sizeof (struct StreamReplyMessage); - type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); - if (GNUNET_YES != - GNUNET_BLOCK_get_key (GSF_block_ctx, - type, - &srm[1], msize, &query)) - { - GNUNET_break_op (0); - reset_stream_async (sh); - return GNUNET_SYSERR; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reply `%s' via stream\n", - GNUNET_h2s (&query)); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received via stream"), 1, - GNUNET_NO); - hrc.data = &srm[1]; - hrc.data_size = msize; - hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); - hrc.type = type; - hrc.found = GNUNET_NO; - GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map, - &query, - &handle_reply, - &hrc); - if (GNUNET_NO == hrc.found) - { - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received via stream dropped"), 1, - GNUNET_NO); - return GNUNET_OK; - } - return GNUNET_OK; - default: GNUNET_break_op (0); reset_stream_async (sh); return GNUNET_SYSERR; } + srm = (const struct StreamReplyMessage *) message; + msize -= sizeof (struct StreamReplyMessage); + type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); + if (GNUNET_YES != + GNUNET_BLOCK_get_key (GSF_block_ctx, + type, + &srm[1], msize, &query)) + { + GNUNET_break_op (0); + reset_stream_async (sh); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received reply `%s' via stream\n", + GNUNET_h2s (&query)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via stream"), 1, + GNUNET_NO); + hrc.data = &srm[1]; + hrc.data_size = msize; + hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); + hrc.type = type; + hrc.found = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map, + &query, + &handle_reply, + &hrc); + if (GNUNET_NO == hrc.found) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via stream dropped"), 1, + GNUNET_NO); + return GNUNET_OK; + } + return GNUNET_OK; } @@ -829,15 +738,12 @@ get_stream (const struct GNUNET_PeerIdentity *target) sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_stream_task, sh); - sh->mst = GNUNET_SERVER_mst_create (&reply_cb, - sh); sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES); sh->target = *target; - sh->stream = GNUNET_STREAM_open (GSF_cfg, - &sh->target, - GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, - &stream_ready_cb, sh, - GNUNET_STREAM_OPTION_END); + sh->stream = GNUNET_MESH_tunnel_create (listen_socket, + sh, + &sh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (stream_map, &sh->target.hashPubKey, @@ -933,14 +839,11 @@ terminate_stream (struct StreamClient *sc) GNUNET_SCHEDULER_cancel (sc->terminate_task); if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) GNUNET_SCHEDULER_cancel (sc->timeout_task); - if (NULL != sc->rh) - GNUNET_STREAM_read_cancel (sc->rh); if (NULL != sc->wh) - GNUNET_STREAM_write_cancel (sc->wh); + GNUNET_MESH_notify_transmit_ready_cancel (sc->wh); if (NULL != sc->qe) GNUNET_DATASTORE_cancel (sc->qe); - GNUNET_SERVER_mst_destroy (sc->mst); - GNUNET_STREAM_close (sc->socket); + GNUNET_MESH_tunnel_destroy (sc->socket); struct WriteQueueItem *wqi; while (NULL != (wqi = sc->wqi_head)) { @@ -949,8 +852,6 @@ terminate_stream (struct StreamClient *sc) wqi); GNUNET_free (wqi); } - - GNUNET_CONTAINER_DLL_remove (sc_head, sc_tail, sc); @@ -959,23 +860,6 @@ terminate_stream (struct StreamClient *sc) } -/** - * Task run to asynchronously terminate the stream. - * - * @param cls the 'struct StreamClient' - * @param tc scheduler context - */ -static void -terminate_stream_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct StreamClient *sc = cls; - - sc->terminate_task = GNUNET_SCHEDULER_NO_TASK; - terminate_stream (sc); -} - - /** * Task run to asynchronously terminate the stream due to timeout. * @@ -1009,39 +893,6 @@ refresh_timeout_task (struct StreamClient *sc) } -/** - * We had a serious error, termiante stream, - * but do so asynchronously. - * - * @param sc stream to reset - */ -static void -terminate_stream_async (struct StreamClient *sc) -{ - if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task) - sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task, - sc); -} - - -/** - * Functions of this signature are called whenever data is available from the - * stream. - * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). - */ -static size_t -process_request (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size); - - /** * We're done handling a request from a client, read the next one. * @@ -1050,22 +901,7 @@ process_request (void *cls, static void continue_reading (struct StreamClient *sc) { - int ret; - - ret = - GNUNET_SERVER_mst_receive (sc->mst, - NULL, - NULL, 0, - GNUNET_NO, GNUNET_NO); - if (GNUNET_NO == ret) - return; refresh_timeout_task (sc); - if (NULL != sc->rh) - return; - sc->rh = GNUNET_STREAM_read (sc->socket, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_request, - sc); } @@ -1079,93 +915,49 @@ continue_writing (struct StreamClient *sc); /** - * Functions of this signature are called whenever data is available from the - * stream. - * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called - * @param data traffic from the other side - * @param size the number of bytes available in data read; will be 0 on timeout - * @return number of bytes of processed from 'data' (any data remaining should be - * given to the next time the read processor is called). - */ -static size_t -process_request (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) -{ - struct StreamClient *sc = cls; - int ret; - - sc->rh = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received %u byte query via stream\n", - (unsigned int) size); - switch (status) - { - case GNUNET_STREAM_OK: - ret = - GNUNET_SERVER_mst_receive (sc->mst, - NULL, - data, size, - GNUNET_NO, GNUNET_NO); - if (GNUNET_NO == ret) - return size; /* more messages in MST */ - if (GNUNET_SYSERR == ret) - { - GNUNET_break_op (0); - terminate_stream_async (sc); - return size; - } - break; - case GNUNET_STREAM_TIMEOUT: - case GNUNET_STREAM_SHUTDOWN: - case GNUNET_STREAM_SYSERR: - terminate_stream_async (sc); - return size; - default: - GNUNET_break (0); - return size; - } - continue_writing (sc); - return size; -} - - -/** - * Sending a reply was completed, continue processing. + * Send a reply now, mesh is ready. * * @param cls closure with the struct StreamClient which sent the query - * @param status result code for the operation - * @param size number of bytes that were transmitted + * @param size number of bytes available in 'buf' + * @param buf where to write the message + * @return number of bytes written to 'buf' */ -static void +static size_t write_continuation (void *cls, - enum GNUNET_STREAM_Status status, - size_t size) + size_t size, + void *buf) { struct StreamClient *sc = cls; - + struct WriteQueueItem *wqi; + size_t ret; + sc->wh = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write continuation called on 'server' side with status %d\n", - status); - if ( (GNUNET_STREAM_OK != status) || - (size != sc->reply_size) ) + if (NULL == (wqi = sc->wqi_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write queue empty, reading more requests\n"); + return 0; + } + if (0 == size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission of reply failed, terminating stream\n"); terminate_stream (sc); - return; + return 0; } + GNUNET_CONTAINER_DLL_remove (sc->wqi_head, + sc->wqi_tail, + wqi); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u byte reply via stream\n", (unsigned int) size); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# Blocks transferred via stream"), 1, GNUNET_NO); + memcpy (buf, &wqi[1], ret = wqi->msize); + GNUNET_free (wqi); continue_writing (sc); + return ret; } @@ -1192,19 +984,11 @@ continue_writing (struct StreamClient *sc) continue_reading (sc); return; } - GNUNET_CONTAINER_DLL_remove (sc->wqi_head, - sc->wqi_tail, - wqi); - sc->wh = GNUNET_STREAM_write (sc->socket, - &wqi[1], wqi->msize, - GNUNET_TIME_UNIT_FOREVER_REL, - &write_continuation, - sc); - if (NULL != sc->wh) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Gave %u bytes for stream for transmission\n", - (unsigned int) wqi->msize); - GNUNET_free (wqi); + sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO, + GNUNET_TIME_UNIT_FOREVER_REL, + wqi->msize, + &write_continuation, + sc); if (NULL == sc->wh) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1302,50 +1086,37 @@ handle_datastore_reply (void *cls, */ static int request_cb (void *cls, - void *client, + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *message) { - struct StreamClient *sc = cls; + struct StreamClient *sc = *tunnel_ctx; const struct StreamQueryMessage *sqm; - switch (ntohs (message->type)) + sqm = (const struct StreamQueryMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received query for `%s' via stream\n", + GNUNET_h2s (&sqm->query)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# queries received via stream"), 1, + GNUNET_NO); + refresh_timeout_task (sc); + sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + 0, + &sqm->query, + ntohl (sqm->type), + 0 /* priority */, + GSF_datastore_queue_size, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_datastore_reply, sc); + if (NULL == sc->qe) { - case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY: - if (sizeof (struct StreamQueryMessage) != - ntohs (message->size)) - { - GNUNET_break_op (0); - terminate_stream_async (sc); - return GNUNET_SYSERR; - } - sqm = (const struct StreamQueryMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received query for `%s' via stream\n", - GNUNET_h2s (&sqm->query)); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# queries received via stream"), 1, - GNUNET_NO); - refresh_timeout_task (sc); - sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, - 0, - &sqm->query, - ntohl (sqm->type), - 0 /* priority */, - GSF_datastore_queue_size, - GNUNET_TIME_UNIT_FOREVER_REL, - &handle_datastore_reply, sc); - if (NULL == sc->qe) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing request with datastore failed (queue full?)\n"); - continue_writing (sc); - } - return GNUNET_OK; - default: - GNUNET_break_op (0); - terminate_stream_async (sc); - return GNUNET_SYSERR; + "Queueing request with datastore failed (queue full?)\n"); + continue_writing (sc); } + return GNUNET_OK; } @@ -1355,27 +1126,27 @@ request_cb (void *cls, * GNUNET_STREAM_listen() is already taken. * * @param cls the closure from GNUNET_STREAM_listen - * @param socket the socket representing the stream; NULL on binding error + * @param socket the socket representing the stream * @param initiator the identity of the peer who wants to establish a stream * with us; NULL on binding error - * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the - * stream (the socket will be invalid after the call) + * @return initial tunnel context (our 'struct StreamClient') */ -static int +static void * accept_cb (void *cls, - struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_PeerIdentity *initiator) + struct GNUNET_MESH_Tunnel *socket, + const struct GNUNET_PeerIdentity *initiator, + uint32_t port) { struct StreamClient *sc; - if (NULL == socket) - return GNUNET_SYSERR; + GNUNET_assert (NULL != socket); if (sc_count >= sc_count_max) { GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# stream client connections rejected"), 1, GNUNET_NO); - return GNUNET_SYSERR; + GNUNET_MESH_tunnel_destroy (socket); + return NULL; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Accepting inbound stream connection from `%s'\n", @@ -1385,18 +1156,12 @@ accept_cb (void *cls, GNUNET_NO); sc = GNUNET_malloc (sizeof (struct StreamClient)); sc->socket = socket; - sc->mst = GNUNET_SERVER_mst_create (&request_cb, - sc); - sc->rh = GNUNET_STREAM_read (sc->socket, - GNUNET_TIME_UNIT_FOREVER_REL, - &process_request, - sc); GNUNET_CONTAINER_DLL_insert (sc_head, sc_tail, sc); sc_count++; refresh_timeout_task (sc); - return GNUNET_OK; + return sc; } @@ -1406,6 +1171,16 @@ accept_cb (void *cls, void GSF_stream_start () { + static const struct GNUNET_MESH_MessageHandler handlers[] = { + { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct StreamQueryMessage)}, + { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 }, + { NULL, 0, 0 } + }; + static const uint32_t ports[] = { + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + 0 + }; + stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_number (GSF_cfg, @@ -1413,10 +1188,12 @@ GSF_stream_start () "MAX_STREAM_CLIENTS", &sc_count_max)) { - listen_socket = GNUNET_STREAM_listen (GSF_cfg, - GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, - &accept_cb, NULL, - GNUNET_STREAM_OPTION_END); + listen_socket = GNUNET_MESH_connect (GSF_cfg, + NULL, + &accept_cb, + NULL /* FIXME: have a cleanup callback? */, + handlers, + ports); } } @@ -1453,7 +1230,7 @@ GSF_stream_stop () terminate_stream (sc); if (NULL != listen_socket) { - GNUNET_STREAM_listen_close (listen_socket); + GNUNET_MESH_disconnect (listen_socket); listen_socket = NULL; } GNUNET_CONTAINER_multihashmap_iterate (stream_map,