* @author Christian Grothoff
*
* TODO:
- * - update comments on functions (still matches 'stream')
+ * - update comments on functions (still matches 'mesh')
* - MESH2 API doesn't allow flow control for server yet (needed!)
* - likely need to register clean up handler with mesh to handle
* client disconnect (likely leaky right now)
/**
- * A message in the queue to be written to the stream.
+ * A message in the queue to be written to the mesh.
*/
struct WriteQueueItem
{
/**
- * Information we keep around for each active streaming client.
+ * Information we keep around for each active meshing client.
*/
struct StreamClient
{
/**
- * Handle for a stream to another peer.
+ * Handle for a mesh to another peer.
*/
struct StreamHandle;
/**
- * Handle for a request that is going out via stream API.
+ * Handle for a request that is going out via mesh API.
*/
struct GSF_StreamRequest
{
struct GSF_StreamRequest *prev;
/**
- * Which stream is this request associated with?
+ * Which mesh is this request associated with?
*/
struct StreamHandle *sh;
/**
- * Handle for a stream to another peer.
+ * Handle for a mesh to another peer.
*/
struct StreamHandle
{
/**
- * Head of DLL of pending requests on this stream.
+ * Head of DLL of pending requests on this mesh.
*/
struct GSF_StreamRequest *pending_head;
/**
- * Tail of DLL of pending requests on this stream.
+ * Tail of DLL of pending requests on this mesh.
*/
struct GSF_StreamRequest *pending_tail;
/**
* Connection to the other peer.
*/
- struct GNUNET_MESH_Tunnel *stream;
+ struct GNUNET_MESH_Tunnel *mesh;
/**
* Handle for active write operation, or NULL.
struct GNUNET_MESH_TransmitHandle *wh;
/**
- * Which peer does this stream go to?
+ * Which peer does this mesh go to?
*/
struct GNUNET_PeerIdentity target;
/**
- * Task to kill inactive streams (we keep them around for
+ * Task to kill inactive meshs (we keep them around for
* a few seconds to give the application a chance to give
* us another query).
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
/**
- * Task to reset streams that had errors (asynchronously,
+ * Task to reset meshs that had errors (asynchronously,
* as we may not be able to do it immediately during a
- * callback from the stream API).
+ * callback from the mesh API).
*/
GNUNET_SCHEDULER_TaskIdentifier reset_task;
/**
- * Is this stream ready for transmission?
+ * Is this mesh ready for transmission?
*/
int is_ready;
static struct GNUNET_MESH_Handle *listen_socket;
/**
- * Head of DLL of stream clients.
+ * Head of DLL of mesh clients.
*/
static struct StreamClient *sc_head;
/**
- * Tail of DLL of stream clients.
+ * Tail of DLL of mesh clients.
*/
static struct StreamClient *sc_tail;
/**
- * Number of active stream clients in the 'sc_*'-DLL.
+ * Number of active mesh clients in the 'sc_*'-DLL.
*/
static unsigned int sc_count;
/**
- * Maximum allowed number of stream clients.
+ * Maximum allowed number of mesh clients.
*/
static unsigned long long sc_count_max;
/**
- * Map from peer identities to 'struct StreamHandles' with streams to
+ * Map from peer identities to 'struct StreamHandles' with meshs to
* those peers.
*/
-static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
+static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
/* ********************* client-side code ************************* */
sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
GNUNET_TIME_UNIT_FOREVER_ABS,
0, NULL);
- GSF_stream_query_cancel (sr);
+ GSF_mesh_query_cancel (sr);
return GNUNET_YES;
}
/**
- * Destroy a stream handle.
+ * Destroy a mesh handle.
*
- * @param sh stream to process
+ * @param sh mesh to process
*/
static void
-destroy_stream_handle (struct StreamHandle *sh)
+destroy_mesh_handle (struct StreamHandle *sh)
{
struct GSF_StreamRequest *sr;
sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
GNUNET_TIME_UNIT_FOREVER_ABS,
0, NULL);
- GSF_stream_query_cancel (sr);
+ GSF_mesh_query_cancel (sr);
}
GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
&free_waiting_entry,
GNUNET_SCHEDULER_cancel (sh->timeout_task);
if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
GNUNET_SCHEDULER_cancel (sh->reset_task);
- GNUNET_MESH_tunnel_destroy (sh->stream);
+ GNUNET_MESH_tunnel_destroy (sh->mesh);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (stream_map,
+ GNUNET_CONTAINER_multihashmap_remove (mesh_map,
&sh->target.hashPubKey,
sh));
GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
/**
- * Transmit pending requests via the stream.
+ * Transmit pending requests via the mesh.
*
- * @param sh stream to process
+ * @param sh mesh to process
*/
static void
transmit_pending (struct StreamHandle *sh);
/**
- * We had a serious error, tear down and re-create stream from scratch.
+ * We had a serious error, tear down and re-create mesh from scratch.
*
- * @param sh stream to reset
+ * @param sh mesh to reset
*/
static void
-reset_stream (struct StreamHandle *sh)
+reset_mesh (struct StreamHandle *sh)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting stream to %s\n",
+ "Resetting mesh to %s\n",
GNUNET_i2s (&sh->target));
- GNUNET_MESH_tunnel_destroy (sh->stream);
+ GNUNET_MESH_tunnel_destroy (sh->mesh);
sh->is_ready = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
&move_to_pending,
sh);
- sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+ sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
sh,
&sh->target,
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
/**
- * Task called when it is time to destroy an inactive stream.
+ * Task called when it is time to destroy an inactive mesh.
*
* @param cls the 'struct StreamHandle' to tear down
* @param tc scheduler context, unused
*/
static void
-stream_timeout (void *cls,
+mesh_timeout (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct StreamHandle *sh = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on stream to %s\n",
+ "Timeout on mesh to %s\n",
GNUNET_i2s (&sh->target));
sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- destroy_stream_handle (sh);
+ destroy_mesh_handle (sh);
}
/**
- * Task called when it is time to reset an stream.
+ * Task called when it is time to reset an mesh.
*
* @param cls the 'struct StreamHandle' to tear down
* @param tc scheduler context, unused
*/
static void
-reset_stream_task (void *cls,
+reset_mesh_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct StreamHandle *sh = cls;
sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
- reset_stream (sh);
+ reset_mesh (sh);
}
/**
- * We had a serious error, tear down and re-create stream from scratch,
+ * We had a serious error, tear down and re-create mesh from scratch,
* but do so asynchronously.
*
- * @param sh stream to reset
+ * @param sh mesh to reset
*/
static void
-reset_stream_async (struct StreamHandle *sh)
+reset_mesh_async (struct StreamHandle *sh)
{
if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
GNUNET_SCHEDULER_cancel (sh->reset_task);
- sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+ sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
sh);
}
/**
* Functions of this signature are called whenever we are ready to transmit
- * query via a stream.
+ * query via a mesh.
*
* @param cls the struct StreamHandle for which we did the write call
* @param size the number of bytes that can be written to 'buf'
sh->wh = NULL;
if (NULL == buf)
{
- reset_stream (sh);
+ reset_mesh (sh);
return 0;
}
sr = sh->pending_head;
sr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query via stream to %s\n",
+ "Sending query via mesh to %s\n",
GNUNET_i2s (&sh->target));
sr->was_transmitted = GNUNET_YES;
sqm.header.size = htons (sizeof (sqm));
/**
- * Transmit pending requests via the stream.
+ * Transmit pending requests via the mesh.
*
- * @param sh stream to process
+ * @param sh mesh to process
*/
static void
transmit_pending (struct StreamHandle *sh)
{
if (NULL != sh->wh)
return;
- sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow cork */,
+ sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow cork */,
GNUNET_TIME_UNIT_FOREVER_REL,
sizeof (struct StreamQueryMessage),
&transmit_sqm, sh);
hrc->expiration,
hrc->data_size,
hrc->data);
- GSF_stream_query_cancel (sr);
+ GSF_mesh_query_cancel (sr);
hrc->found = GNUNET_YES;
return GNUNET_YES;
}
* complete reply is received.
*
* @param cls closure with the 'struct StreamHandle'
- * @param client identification of the client, NULL
+ * @param tunnel tunnel handle
+ * @param tunnel_ctx tunnel context
* @param message the actual message
* @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
*/
reply_cb (void *cls,
struct GNUNET_MESH_Tunnel *tunnel,
void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *message)
+ const struct GNUNET_MessageHeader *message)
{
struct StreamHandle *sh = *tunnel_ctx;
const struct StreamReplyMessage *srm;
if (sizeof (struct StreamReplyMessage) > msize)
{
GNUNET_break_op (0);
- reset_stream_async (sh);
+ reset_mesh_async (sh);
return GNUNET_SYSERR;
}
srm = (const struct StreamReplyMessage *) message;
&srm[1], msize, &query))
{
GNUNET_break_op (0);
- reset_stream_async (sh);
+ reset_mesh_async (sh);
return GNUNET_SYSERR;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received reply `%s' via stream\n",
+ "Received reply `%s' via mesh\n",
GNUNET_h2s (&query));
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via stream"), 1,
+ gettext_noop ("# replies received via mesh"), 1,
GNUNET_NO);
hrc.data = &srm[1];
hrc.data_size = msize;
if (GNUNET_NO == hrc.found)
{
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via stream dropped"), 1,
+ gettext_noop ("# replies received via mesh dropped"), 1,
GNUNET_NO);
return GNUNET_OK;
}
/**
- * Get (or create) a stream to talk to the given peer.
+ * Get (or create) a mesh to talk to the given peer.
*
* @param target peer we want to communicate with
*/
static struct StreamHandle *
-get_stream (const struct GNUNET_PeerIdentity *target)
+get_mesh (const struct GNUNET_PeerIdentity *target)
{
struct StreamHandle *sh;
- sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
+ sh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
&target->hashPubKey);
if (NULL != sh)
{
return sh;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating stream to %s\n",
+ "Creating mesh to %s\n",
GNUNET_i2s (target));
sh = GNUNET_malloc (sizeof (struct StreamHandle));
sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
- &reset_stream_task,
+ &reset_mesh_task,
sh);
sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
sh->target = *target;
- sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+ sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
sh,
&sh->target,
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (stream_map,
+ GNUNET_CONTAINER_multihashmap_put (mesh_map,
&sh->target.hashPubKey,
sh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
* @return handle to cancel the operation
*/
struct GSF_StreamRequest *
-GSF_stream_query (const struct GNUNET_PeerIdentity *target,
+GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
const struct GNUNET_HashCode *query,
enum GNUNET_BLOCK_Type type,
GSF_StreamReplyProcessor proc, void *proc_cls)
struct GSF_StreamRequest *sr;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing to send query for %s via stream to %s\n",
+ "Preparing to send query for %s via mesh to %s\n",
GNUNET_h2s (query),
GNUNET_i2s (target));
- sh = get_stream (target);
+ sh = get_mesh (target);
sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
sr->sh = sh;
sr->proc = proc;
* @param sr request to cancel
*/
void
-GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
+GSF_mesh_query_cancel (struct GSF_StreamRequest *sr)
{
struct StreamHandle *sh = sr->sh;
if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
(NULL == sh->pending_head) )
sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &stream_timeout,
+ &mesh_timeout,
sh);
}
* @param sc client to clean up
*/
static void
-terminate_stream (struct StreamClient *sc)
+terminate_mesh (struct StreamClient *sc)
{
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# stream connections active"), -1,
+ gettext_noop ("# mesh connections active"), -1,
GNUNET_NO);
if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
GNUNET_SCHEDULER_cancel (sc->terminate_task);
/**
- * Task run to asynchronously terminate the stream due to timeout.
+ * Task run to asynchronously terminate the mesh due to timeout.
*
* @param cls the 'struct StreamClient'
* @param tc scheduler context
*/
static void
-timeout_stream_task (void *cls,
+timeout_mesh_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct StreamClient *sc = cls;
sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- terminate_stream (sc);
+ terminate_mesh (sc);
}
/**
- * Reset the timeout for the stream client (due to activity).
+ * Reset the timeout for the mesh client (due to activity).
*
* @param sc client handle to reset timeout for
*/
if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
GNUNET_SCHEDULER_cancel (sc->timeout_task);
sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
- &timeout_stream_task,
+ &timeout_mesh_task,
sc);
}
if (0 == size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission of reply failed, terminating stream\n");
- terminate_stream (sc);
+ "Transmission of reply failed, terminating mesh\n");
+ terminate_mesh (sc);
return 0;
}
GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
sc->wqi_tail,
wqi);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitted %u byte reply via stream\n",
+ "Transmitted %u byte reply via mesh\n",
(unsigned int) size);
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# Blocks transferred via stream"), 1,
+ gettext_noop ("# Blocks transferred via mesh"), 1,
GNUNET_NO);
memcpy (buf, &wqi[1], ret = wqi->msize);
GNUNET_free (wqi);
if (NULL == sc->wh)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write failed; terminating stream\n");
- terminate_stream (sc);
+ "Write failed; terminating mesh\n");
+ terminate_mesh (sc);
return;
}
}
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting transmission of %u byte reply for query `%s' via stream\n",
+ "Starting transmission of %u byte reply for query `%s' via mesh\n",
(unsigned int) size,
GNUNET_h2s (key));
wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
* Do not call GNUNET_SERVER_mst_destroy in callback
*
* @param cls closure with the 'struct StreamClient'
- * @param client identification of the client, NULL
+ * @param tunnel tunnel handle
+ * @param tunnel_ctx tunnel context
* @param message the actual message
* @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
*/
request_cb (void *cls,
struct GNUNET_MESH_Tunnel *tunnel,
void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *message)
{
struct StreamClient *sc = *tunnel_ctx;
sqm = (const struct StreamQueryMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received query for `%s' via stream\n",
+ "Received query for `%s' via mesh\n",
GNUNET_h2s (&sqm->query));
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via stream"), 1,
+ gettext_noop ("# queries received via mesh"), 1,
GNUNET_NO);
refresh_timeout_task (sc);
sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
/**
- * Functions of this type are called upon new stream connection from other peers
+ * Functions of this type are called upon new mesh connection from other peers
* or upon binding error which happen when the app_port given in
* GNUNET_STREAM_listen() is already taken.
*
* @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream
- * @param initiator the identity of the peer who wants to establish a stream
+ * @param socket the socket representing the mesh
+ * @param initiator the identity of the peer who wants to establish a mesh
* with us; NULL on binding error
* @return initial tunnel context (our 'struct StreamClient')
*/
if (sc_count >= sc_count_max)
{
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# stream client connections rejected"), 1,
+ gettext_noop ("# mesh client connections rejected"), 1,
GNUNET_NO);
GNUNET_MESH_tunnel_destroy (socket);
return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Accepting inbound stream connection from `%s'\n",
+ "Accepting inbound mesh connection from `%s'\n",
GNUNET_i2s (initiator));
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# stream connections active"), 1,
+ gettext_noop ("# mesh connections active"), 1,
GNUNET_NO);
sc = GNUNET_malloc (sizeof (struct StreamClient));
sc->socket = socket;
* Initialize subsystem for non-anonymous file-sharing.
*/
void
-GSF_stream_start ()
+GSF_mesh_start ()
{
static const struct GNUNET_MESH_MessageHandler handlers[] = {
{ &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct StreamQueryMessage)},
0
};
- stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+ mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
if (GNUNET_YES ==
GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
"fs",
/**
- * Function called on each active streams to shut them down.
+ * Function called on each active meshs to shut them down.
*
* @param cls NULL
* @param key target peer, unused
* @return GNUNET_YES (continue to iterate)
*/
static int
-release_streams (void *cls,
+release_meshs (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
struct StreamHandle *sh = value;
- destroy_stream_handle (sh);
+ destroy_mesh_handle (sh);
return GNUNET_YES;
}
* Shutdown subsystem for non-anonymous file-sharing.
*/
void
-GSF_stream_stop ()
+GSF_mesh_stop ()
{
struct StreamClient *sc;
while (NULL != (sc = sc_head))
- terminate_stream (sc);
+ terminate_mesh (sc);
if (NULL != listen_socket)
{
GNUNET_MESH_disconnect (listen_socket);
listen_socket = NULL;
}
- GNUNET_CONTAINER_multihashmap_iterate (stream_map,
- &release_streams,
+ GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
+ &release_meshs,
NULL);
- GNUNET_CONTAINER_multihashmap_destroy (stream_map);
- stream_map = NULL;
+ GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
+ mesh_map = NULL;
}
-/* end of gnunet-service-fs_stream.c */
+/* end of gnunet-service-fs_mesh.c */