/*
This file is part of GNUnet.
- (C) 2012 Christian Grothoff (and other contributing authors)
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* @file fs/gnunet-service-fs_stream.c
* @brief non-anonymous file-transfer
* @author Christian Grothoff
+ *
+ * TODO:
+ * - update comments on functions (still matches 'stream')
+ * - MESH2 API doesn't allow flow control for server yet (needed!)
+ * - likely need to register clean up handler with mesh to handle
+ * client disconnect (likely leaky right now)
+ * - server is optional, currently client code will NPE if we have
+ * no server, again MESH2 API requirement forcing this for now
+ * - message handlers are symmetric for client/server, should be
+ * separated (currently clients can get requests and servers can
+ * handle answers, not good)
+ * - code is entirely untested
+ * - might have overlooked a few possible simplifications
+ * - PORT is set to old application type, unsure if we should keep
+ * it that way (fine for now)
*/
#include "platform.h"
#include "gnunet_constants.h"
#include "gnunet_util_lib.h"
-#include "gnunet_stream_lib.h"
+#include "gnunet_mesh2_service.h"
#include "gnunet_protocols.h"
#include "gnunet_applications.h"
#include "gnunet-service-fs.h"
/**
* Socket for communication.
*/
- struct GNUNET_STREAM_Socket *socket;
-
- /**
- * Handle for active read operation, or NULL.
- */
- struct GNUNET_STREAM_ReadHandle *rh;
+ struct GNUNET_MESH_Tunnel *socket;
/**
* Handle for active write operation, or NULL.
*/
- struct GNUNET_STREAM_WriteHandle *wh;
+ struct GNUNET_MESH_TransmitHandle *wh;
/**
* Head of write queue.
*/
struct WriteQueueItem *wqi_tail;
- /**
- * Tokenizer for requests.
- */
- struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-
/**
* Current active request to the datastore, if we have one pending.
*/
/**
* Connection to the other peer.
*/
- struct GNUNET_STREAM_Socket *stream;
-
- /**
- * Handle for active read operation, or NULL.
- */
- struct GNUNET_STREAM_ReadHandle *rh;
+ struct GNUNET_MESH_Tunnel *stream;
/**
* Handle for active write operation, or NULL.
*/
- struct GNUNET_STREAM_WriteHandle *wh;
-
- /**
- * Tokenizer for replies.
- */
- struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+ struct GNUNET_MESH_TransmitHandle *wh;
/**
* Which peer does this stream go to?
/**
* Listen socket for incoming requests.
*/
-static struct GNUNET_STREAM_ListenSocket *listen_socket;
+static struct GNUNET_MESH_Handle *listen_socket;
/**
* Head of DLL of stream clients.
&free_waiting_entry,
sh);
if (NULL != sh->wh)
- GNUNET_STREAM_write_cancel (sh->wh);
- if (NULL != sh->rh)
- GNUNET_STREAM_read_cancel (sh->rh);
+ GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
GNUNET_SCHEDULER_cancel (sh->timeout_task);
if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
GNUNET_SCHEDULER_cancel (sh->reset_task);
- GNUNET_STREAM_close (sh->stream);
+ GNUNET_MESH_tunnel_destroy (sh->stream);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (stream_map,
&sh->target.hashPubKey,
transmit_pending (struct StreamHandle *sh);
-/**
- * Function called once the stream is ready for transmission.
- *
- * @param cls the 'struct StreamHandle'
- * @param socket stream socket handle
- */
-static void
-stream_ready_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket)
-{
- struct StreamHandle *sh = cls;
-
- sh->is_ready = GNUNET_YES;
- transmit_pending (sh);
-}
-
-
/**
* Iterator called on each entry in a waiting map to
* move it back to the pending list.
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Resetting stream to %s\n",
GNUNET_i2s (&sh->target));
- if (NULL != sh->rh)
- {
- GNUNET_STREAM_read_cancel (sh->rh);
- sh->rh = NULL;
- }
- GNUNET_STREAM_close (sh->stream);
+ GNUNET_MESH_tunnel_destroy (sh->stream);
sh->is_ready = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
&move_to_pending,
sh);
- sh->stream = GNUNET_STREAM_open (GSF_cfg,
- &sh->target,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- &stream_ready_cb, sh,
- GNUNET_STREAM_OPTION_END);
+ sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+ sh,
+ &sh->target,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
}
/**
- * We got a reply from the stream. Process it.
- *
- * @param cls the struct StreamHandle
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout
- * @return number of bytes of processed from 'data' (any data remaining should be
- * given to the next time the read processor is called).
- */
-static size_t
-handle_stream_reply (void *cls,
- enum GNUNET_STREAM_Status status,
- const void *data,
- size_t size)
-{
- struct StreamHandle *sh = cls;
-
- sh->rh = NULL;
- GNUNET_SCHEDULER_cancel (sh->reset_task);
- sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
- &reset_stream_task,
- sh);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received %u bytes from stream to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&sh->target));
- if (GNUNET_SYSERR ==
- GNUNET_SERVER_mst_receive (sh->mst,
- NULL,
- data, size,
- GNUNET_NO, GNUNET_NO))
- {
- GNUNET_break_op (0);
- reset_stream_async (sh);
- return size;
- }
- if (NULL == sh->rh)
- sh->rh = GNUNET_STREAM_read (sh->stream,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &handle_stream_reply,
- sh);
- return size;
-}
-
-
-/**
- * Functions of this signature are called whenever we transmitted a
+ * Functions of this signature are called whenever we are ready to transmit
* query via a stream.
*
* @param cls the struct StreamHandle for which we did the write call
- * @param status the status of the stream at the time this function is called;
- * GNUNET_OK if writing to stream was completed successfully,
- * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- * mean time.
- * @param size the number of bytes written
+ * @param size the number of bytes that can be written to 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
*/
-static void
-query_write_continuation (void *cls,
- enum GNUNET_STREAM_Status status,
- size_t size)
+static size_t
+transmit_sqm (void *cls,
+ size_t size,
+ void *buf)
{
struct StreamHandle *sh = cls;
+ struct StreamQueryMessage sqm;
+ struct GSF_StreamRequest *sr;
sh->wh = NULL;
- if ( (GNUNET_STREAM_OK != status) ||
- (sizeof (struct StreamQueryMessage) != size) )
+ if (NULL == buf)
{
reset_stream (sh);
- return;
+ return 0;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully transmitted %u bytes via stream to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&sh->target));
- if (NULL == sh->rh)
- sh->rh = GNUNET_STREAM_read (sh->stream,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &handle_stream_reply,
- sh);
- transmit_pending (sh);
-}
-
-
-/**
- * Transmit pending requests via the stream.
- *
- * @param sh stream to process
- */
-static void
-transmit_pending (struct StreamHandle *sh)
-{
- struct StreamQueryMessage sqm;
- struct GSF_StreamRequest *sr;
-
- if (NULL != sh->wh)
- return;
sr = sh->pending_head;
if (NULL == sr)
- return;
+ return 0;
+ GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
GNUNET_CONTAINER_DLL_remove (sh->pending_head,
sh->pending_tail,
sr);
sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
sqm.type = htonl (sr->type);
sqm.query = sr->query;
- sh->wh = GNUNET_STREAM_write (sh->stream,
- &sqm, sizeof (sqm),
- GNUNET_TIME_UNIT_FOREVER_REL,
- &query_write_continuation,
- sh);
+ memcpy (buf, &sqm, sizeof (sqm));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully transmitted %u bytes via mesh to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&sh->target));
+ transmit_pending (sh);
+ return sizeof (sqm);
+}
+
+
+/**
+ * Transmit pending requests via the stream.
+ *
+ * @param sh stream to process
+ */
+static void
+transmit_pending (struct StreamHandle *sh)
+{
+ if (NULL != sh->wh)
+ return;
+ sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow cork */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct StreamQueryMessage),
+ &transmit_sqm, sh);
}
* Functions with this signature are called whenever a
* complete reply is received.
*
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
* @param cls closure with the 'struct StreamHandle'
* @param client identification of the client, NULL
* @param message the actual message
*/
static int
reply_cb (void *cls,
- void *client,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *message)
{
- struct StreamHandle *sh = cls;
+ struct StreamHandle *sh = *tunnel_ctx;
const struct StreamReplyMessage *srm;
struct HandleReplyClosure hrc;
uint16_t msize;
struct GNUNET_HashCode query;
msize = ntohs (message->size);
- switch (ntohs (message->type))
+ if (sizeof (struct StreamReplyMessage) > msize)
{
- case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
- if (sizeof (struct StreamReplyMessage) > msize)
- {
- GNUNET_break_op (0);
- reset_stream_async (sh);
- return GNUNET_SYSERR;
- }
- srm = (const struct StreamReplyMessage *) message;
- msize -= sizeof (struct StreamReplyMessage);
- type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
- if (GNUNET_YES !=
- GNUNET_BLOCK_get_key (GSF_block_ctx,
- type,
- &srm[1], msize, &query))
- {
- GNUNET_break_op (0);
- reset_stream_async (sh);
- return GNUNET_SYSERR;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received reply `%s' via stream\n",
- GNUNET_h2s (&query));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via stream"), 1,
- GNUNET_NO);
- hrc.data = &srm[1];
- hrc.data_size = msize;
- hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
- hrc.type = type;
- hrc.found = GNUNET_NO;
- GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
- &query,
- &handle_reply,
- &hrc);
- if (GNUNET_NO == hrc.found)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via stream dropped"), 1,
- GNUNET_NO);
- return GNUNET_OK;
- }
- return GNUNET_OK;
- default:
GNUNET_break_op (0);
reset_stream_async (sh);
return GNUNET_SYSERR;
}
+ srm = (const struct StreamReplyMessage *) message;
+ msize -= sizeof (struct StreamReplyMessage);
+ type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ &srm[1], msize, &query))
+ {
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received reply `%s' via stream\n",
+ GNUNET_h2s (&query));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via stream"), 1,
+ GNUNET_NO);
+ hrc.data = &srm[1];
+ hrc.data_size = msize;
+ hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+ hrc.type = type;
+ hrc.found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+ &query,
+ &handle_reply,
+ &hrc);
+ if (GNUNET_NO == hrc.found)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via stream dropped"), 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ return GNUNET_OK;
}
sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
&reset_stream_task,
sh);
- sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
- sh);
sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
sh->target = *target;
- sh->stream = GNUNET_STREAM_open (GSF_cfg,
- &sh->target,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- &stream_ready_cb, sh,
- GNUNET_STREAM_OPTION_END);
+ sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+ sh,
+ &sh->target,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (stream_map,
&sh->target.hashPubKey,
GNUNET_SCHEDULER_cancel (sc->terminate_task);
if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
GNUNET_SCHEDULER_cancel (sc->timeout_task);
- if (NULL != sc->rh)
- GNUNET_STREAM_read_cancel (sc->rh);
if (NULL != sc->wh)
- GNUNET_STREAM_write_cancel (sc->wh);
+ GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
if (NULL != sc->qe)
GNUNET_DATASTORE_cancel (sc->qe);
- GNUNET_SERVER_mst_destroy (sc->mst);
- GNUNET_STREAM_close (sc->socket);
+ GNUNET_MESH_tunnel_destroy (sc->socket);
struct WriteQueueItem *wqi;
while (NULL != (wqi = sc->wqi_head))
{
wqi);
GNUNET_free (wqi);
}
-
-
GNUNET_CONTAINER_DLL_remove (sc_head,
sc_tail,
sc);
}
-/**
- * Task run to asynchronously terminate the stream.
- *
- * @param cls the 'struct StreamClient'
- * @param tc scheduler context
- */
-static void
-terminate_stream_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct StreamClient *sc = cls;
-
- sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
- terminate_stream (sc);
-}
-
-
/**
* Task run to asynchronously terminate the stream due to timeout.
*
}
-/**
- * We had a serious error, termiante stream,
- * but do so asynchronously.
- *
- * @param sc stream to reset
- */
-static void
-terminate_stream_async (struct StreamClient *sc)
-{
- if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
- sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
- sc);
-}
-
-
-/**
- * Functions of this signature are called whenever data is available from the
- * stream.
- *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout
- * @return number of bytes of processed from 'data' (any data remaining should be
- * given to the next time the read processor is called).
- */
-static size_t
-process_request (void *cls,
- enum GNUNET_STREAM_Status status,
- const void *data,
- size_t size);
-
-
/**
* We're done handling a request from a client, read the next one.
*
static void
continue_reading (struct StreamClient *sc)
{
- int ret;
-
- ret =
- GNUNET_SERVER_mst_receive (sc->mst,
- NULL,
- NULL, 0,
- GNUNET_NO, GNUNET_NO);
- if (GNUNET_NO == ret)
- return;
refresh_timeout_task (sc);
- if (NULL != sc->rh)
- return;
- sc->rh = GNUNET_STREAM_read (sc->socket,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_request,
- sc);
}
/**
- * Functions of this signature are called whenever data is available from the
- * stream.
- *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on timeout
- * @return number of bytes of processed from 'data' (any data remaining should be
- * given to the next time the read processor is called).
- */
-static size_t
-process_request (void *cls,
- enum GNUNET_STREAM_Status status,
- const void *data,
- size_t size)
-{
- struct StreamClient *sc = cls;
- int ret;
-
- sc->rh = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received %u byte query via stream\n",
- (unsigned int) size);
- switch (status)
- {
- case GNUNET_STREAM_OK:
- ret =
- GNUNET_SERVER_mst_receive (sc->mst,
- NULL,
- data, size,
- GNUNET_NO, GNUNET_NO);
- if (GNUNET_NO == ret)
- return size; /* more messages in MST */
- if (GNUNET_SYSERR == ret)
- {
- GNUNET_break_op (0);
- terminate_stream_async (sc);
- return size;
- }
- break;
- case GNUNET_STREAM_TIMEOUT:
- case GNUNET_STREAM_SHUTDOWN:
- case GNUNET_STREAM_SYSERR:
- terminate_stream_async (sc);
- return size;
- default:
- GNUNET_break (0);
- return size;
- }
- continue_writing (sc);
- return size;
-}
-
-
-/**
- * Sending a reply was completed, continue processing.
+ * Send a reply now, mesh is ready.
*
* @param cls closure with the struct StreamClient which sent the query
- * @param status result code for the operation
- * @param size number of bytes that were transmitted
+ * @param size number of bytes available in 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
*/
-static void
+static size_t
write_continuation (void *cls,
- enum GNUNET_STREAM_Status status,
- size_t size)
+ size_t size,
+ void *buf)
{
struct StreamClient *sc = cls;
-
+ struct WriteQueueItem *wqi;
+ size_t ret;
+
sc->wh = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write continuation called on 'server' side with status %d\n",
- status);
- if ( (GNUNET_STREAM_OK != status) ||
- (size != sc->reply_size) )
+ if (NULL == (wqi = sc->wqi_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write queue empty, reading more requests\n");
+ return 0;
+ }
+ if (0 == size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmission of reply failed, terminating stream\n");
terminate_stream (sc);
- return;
+ return 0;
}
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitted %u byte reply via stream\n",
(unsigned int) size);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Blocks transferred via stream"), 1,
GNUNET_NO);
+ memcpy (buf, &wqi[1], ret = wqi->msize);
+ GNUNET_free (wqi);
continue_writing (sc);
+ return ret;
}
continue_reading (sc);
return;
}
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- sc->wh = GNUNET_STREAM_write (sc->socket,
- &wqi[1], wqi->msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &write_continuation,
- sc);
- if (NULL != sc->wh)
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Gave %u bytes for stream for transmission\n",
- (unsigned int) wqi->msize);
- GNUNET_free (wqi);
+ sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ wqi->msize,
+ &write_continuation,
+ sc);
if (NULL == sc->wh)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
*/
static int
request_cb (void *cls,
- void *client,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *message)
{
- struct StreamClient *sc = cls;
+ struct StreamClient *sc = *tunnel_ctx;
const struct StreamQueryMessage *sqm;
- switch (ntohs (message->type))
+ sqm = (const struct StreamQueryMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received query for `%s' via stream\n",
+ GNUNET_h2s (&sqm->query));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries received via stream"), 1,
+ GNUNET_NO);
+ refresh_timeout_task (sc);
+ sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ 0,
+ &sqm->query,
+ ntohl (sqm->type),
+ 0 /* priority */,
+ GSF_datastore_queue_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_datastore_reply, sc);
+ if (NULL == sc->qe)
{
- case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
- if (sizeof (struct StreamQueryMessage) !=
- ntohs (message->size))
- {
- GNUNET_break_op (0);
- terminate_stream_async (sc);
- return GNUNET_SYSERR;
- }
- sqm = (const struct StreamQueryMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received query for `%s' via stream\n",
- GNUNET_h2s (&sqm->query));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via stream"), 1,
- GNUNET_NO);
- refresh_timeout_task (sc);
- sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
- 0,
- &sqm->query,
- ntohl (sqm->type),
- 0 /* priority */,
- GSF_datastore_queue_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &handle_datastore_reply, sc);
- if (NULL == sc->qe)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing request with datastore failed (queue full?)\n");
- continue_writing (sc);
- }
- return GNUNET_OK;
- default:
- GNUNET_break_op (0);
- terminate_stream_async (sc);
- return GNUNET_SYSERR;
+ "Queueing request with datastore failed (queue full?)\n");
+ continue_writing (sc);
}
+ return GNUNET_OK;
}
* GNUNET_STREAM_listen() is already taken.
*
* @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
+ * @param socket the socket representing the stream
* @param initiator the identity of the peer who wants to establish a stream
* with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- * stream (the socket will be invalid after the call)
+ * @return initial tunnel context (our 'struct StreamClient')
*/
-static int
+static void *
accept_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket,
- const struct GNUNET_PeerIdentity *initiator)
+ struct GNUNET_MESH_Tunnel *socket,
+ const struct GNUNET_PeerIdentity *initiator,
+ uint32_t port)
{
struct StreamClient *sc;
- if (NULL == socket)
- return GNUNET_SYSERR;
+ GNUNET_assert (NULL != socket);
if (sc_count >= sc_count_max)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# stream client connections rejected"), 1,
GNUNET_NO);
- return GNUNET_SYSERR;
+ GNUNET_MESH_tunnel_destroy (socket);
+ return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Accepting inbound stream connection from `%s'\n",
GNUNET_NO);
sc = GNUNET_malloc (sizeof (struct StreamClient));
sc->socket = socket;
- sc->mst = GNUNET_SERVER_mst_create (&request_cb,
- sc);
- sc->rh = GNUNET_STREAM_read (sc->socket,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_request,
- sc);
GNUNET_CONTAINER_DLL_insert (sc_head,
sc_tail,
sc);
sc_count++;
refresh_timeout_task (sc);
- return GNUNET_OK;
+ return sc;
}
void
GSF_stream_start ()
{
+ static const struct GNUNET_MESH_MessageHandler handlers[] = {
+ { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct StreamQueryMessage)},
+ { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
+ { NULL, 0, 0 }
+ };
+ static const uint32_t ports[] = {
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ 0
+ };
+
stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
if (GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
"MAX_STREAM_CLIENTS",
&sc_count_max))
{
- listen_socket = GNUNET_STREAM_listen (GSF_cfg,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- &accept_cb, NULL,
- GNUNET_STREAM_OPTION_END);
+ listen_socket = GNUNET_MESH_connect (GSF_cfg,
+ NULL,
+ &accept_cb,
+ NULL /* FIXME: have a cleanup callback? */,
+ handlers,
+ ports);
}
}
terminate_stream (sc);
if (NULL != listen_socket)
{
- GNUNET_STREAM_listen_close (listen_socket);
+ GNUNET_MESH_disconnect (listen_socket);
listen_socket = NULL;
}
GNUNET_CONTAINER_multihashmap_iterate (stream_map,