* @file fs/gnunet-service-fs_stream.c
* @brief non-anonymous file-transfer
* @author Christian Grothoff
- *
- * TODO:
- * - limit # concurrent clients, have timeouts for server-side
*/
#include "platform.h"
#include "gnunet_constants.h"
#include "gnunet-service-fs_indexing.h"
#include "gnunet-service-fs_stream.h"
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
+
+
/**
* Information we keep around for each active streaming client.
*/
*/
GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+ /**
+ * Task that is scheduled to terminate idle connections.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
/**
* Size of the last write that was initiated.
*/
struct GSF_StreamRequest *pending_tail;
/**
- * Head of DLL of requests waiting for a reply on this stream.
- */
- struct GSF_StreamRequest *waiting_head;
-
- /**
- * Tail of DLL of requests waiting for a reply on this stream.
+ * Map from query to 'struct GSF_StreamRequest's waiting for
+ * a reply.
*/
- struct GSF_StreamRequest *waiting_tail;
+ struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
/**
* Connection to the other peer.
*/
static struct StreamClient *sc_tail;
+/**
+ * Number of active stream clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of stream clients.
+ */
+static unsigned long long sc_count_max;
+
/**
* Map from peer identities to 'struct StreamHandles' with streams to
* those peers.
/* ********************* client-side code ************************* */
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_StreamRequest *sr = value;
+
+ sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ 0, NULL);
+ GSF_stream_query_cancel (sr);
+ return GNUNET_YES;
+}
+
/**
* Destroy a stream handle.
0, NULL);
GSF_stream_query_cancel (sr);
}
- while (NULL != (sr = sh->waiting_head))
- {
- sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_FOREVER_ABS,
- 0, NULL);
- GSF_stream_query_cancel (sr);
- }
+ GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+ &free_waiting_entry,
+ sh);
if (NULL != sh->wh)
GNUNET_STREAM_io_write_cancel (sh->wh);
if (NULL != sh->rh)
GNUNET_CONTAINER_multihashmap_remove (stream_map,
&sh->target.hashPubKey,
sh));
+ GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
GNUNET_free (sh);
}
}
+/**
+ * Iterator called on each entry in a waiting map to
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct StreamHandle *sh = cls;
+ struct GSF_StreamRequest *sr = value;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+ key,
+ value));
+ GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+ sh->pending_tail,
+ sr);
+ sr->was_transmitted = GNUNET_NO;
+ return GNUNET_YES;
+}
+
+
/**
* We had a serious error, tear down and re-create stream from scratch.
*
static void
reset_stream (struct StreamHandle *sh)
{
- struct GSF_StreamRequest *sr;
-
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting stream to %s\n",
+ GNUNET_i2s (&sh->target));
if (NULL != sh->rh)
GNUNET_STREAM_io_read_cancel (sh->rh);
GNUNET_STREAM_close (sh->stream);
sh->is_ready = GNUNET_NO;
- while (NULL != (sr = sh->waiting_tail))
- {
- GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
- sh->waiting_tail,
- sr);
- GNUNET_CONTAINER_DLL_insert (sh->pending_head,
- sh->pending_tail,
- sr);
- sr->was_transmitted = GNUNET_NO;
- }
+ GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+ &move_to_pending,
+ sh);
sh->stream = GNUNET_STREAM_open (GSF_cfg,
&sh->target,
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
{
struct StreamHandle *sh = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on stream to %s\n",
+ GNUNET_i2s (&sh->target));
sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
destroy_stream_handle (sh);
}
struct StreamHandle *sh = cls;
sh->rh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u bytes from stream to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&sh->target));
if (GNUNET_SYSERR ==
GNUNET_SERVER_mst_receive (sh->mst,
NULL,
reset_stream (sh);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully transmitted %u bytes via stream to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&sh->target));
if (NULL == sh->rh)
sh->rh = GNUNET_STREAM_read (sh->stream,
GNUNET_TIME_UNIT_FOREVER_REL,
GNUNET_CONTAINER_DLL_remove (sh->pending_head,
sh->pending_tail,
sr);
- GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
- sh->waiting_tail,
- sr);
+ GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
+ &sr->query,
+ sr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query via stream to %s\n",
+ GNUNET_i2s (&sh->target));
sr->was_transmitted = GNUNET_YES;
sqm.header.size = htons (sizeof (sqm));
sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
}
+/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+ /**
+ * Reply payload.
+ */
+ const void *data;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Number of bytes in 'data'.
+ */
+ size_t data_size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we have a matching query?
+ */
+ int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct HandleReplyClosure *hrc = cls;
+ struct GSF_StreamRequest *sr = value;
+
+ sr->proc (sr->proc_cls,
+ hrc->type,
+ hrc->expiration,
+ hrc->data_size,
+ hrc->data);
+ GSF_stream_query_cancel (sr);
+ hrc->found = GNUNET_YES;
+ return GNUNET_YES;
+}
+
+
/**
* Functions with this signature are called whenever a
* complete reply is received.
{
struct StreamHandle *sh = cls;
const struct StreamReplyMessage *srm;
+ struct HandleReplyClosure hrc;
uint16_t msize;
enum GNUNET_BLOCK_Type type;
struct GNUNET_HashCode query;
- struct GSF_StreamRequest *sr;
msize = ntohs (message->size);
switch (ntohs (message->type))
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via stream"), 1,
GNUNET_NO);
- for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
- if (0 == memcmp (&query,
- &sr->query,
- sizeof (struct GNUNET_HashCode)))
- break;
- if (NULL == sr)
+ hrc.data = &srm[1];
+ hrc.data_size = msize;
+ hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+ hrc.type = type;
+ hrc.found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+ &query,
+ &handle_reply,
+ &hrc);
+ if (GNUNET_NO == hrc.found)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via stream dropped"), 1,
GNUNET_NO);
return GNUNET_OK;
}
- sr->proc (sr->proc_cls,
- type,
- GNUNET_TIME_absolute_ntoh (srm->expiration),
- msize,
- &srm[1]);
- GSF_stream_query_cancel (sr);
return GNUNET_OK;
default:
GNUNET_break_op (0);
}
return sh;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating stream to %s\n",
+ GNUNET_i2s (target));
sh = GNUNET_malloc (sizeof (struct StreamHandle));
sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
sh);
+ sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
sh->target = *target;
sh->stream = GNUNET_STREAM_open (GSF_cfg,
&sh->target,
struct StreamHandle *sh;
struct GSF_StreamRequest *sr;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to send query for %s via stream to %s\n",
+ GNUNET_h2s (query),
+ GNUNET_i2s (target));
sh = get_stream (target);
sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
sr->sh = sh;
struct StreamHandle *sh = sr->sh;
if (GNUNET_YES == sr->was_transmitted)
- GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
- sh->waiting_tail,
- sr);
+ GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+ &sr->query,
+ sr);
else
GNUNET_CONTAINER_DLL_remove (sh->pending_head,
sh->pending_tail,
sr);
GNUNET_free (sr);
- if ( (NULL == sh->waiting_head) &&
+ if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
(NULL == sh->pending_head) )
sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&stream_timeout,
GNUNET_NO);
if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
if (NULL != sc->rh)
GNUNET_STREAM_io_read_cancel (sc->rh);
if (NULL != sc->wh)
GNUNET_CONTAINER_DLL_remove (sc_head,
sc_tail,
sc);
+ sc_count--;
GNUNET_free (sc);
}
}
+/**
+ * Task run to asynchronously terminate the stream due to timeout.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */
+static void
+timeout_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamClient *sc = cls;
+
+ sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ terminate_stream (sc);
+}
+
+
+/**
+ * Reset the timeout for the stream client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct StreamClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+ &timeout_stream_task,
+ sc);
+}
+
+
/**
* We had a serious error, termiante stream,
* but do so asynchronously.
GNUNET_NO, GNUNET_YES);
if (GNUNET_NO == ret)
return;
+ refresh_timeout_task (sc);
sc->rh = GNUNET_STREAM_read (sc->socket,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_request,
int ret;
sc->rh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u byte query via stream\n",
+ (unsigned int) size);
switch (status)
{
case GNUNET_STREAM_OK:
* Sending a reply was completed, continue processing.
*
* @param cls closure with the struct StreamClient which sent the query
+ * @param status result code for the operation
+ * @param size number of bytes that were transmitted
*/
static void
write_continuation (void *cls,
if ( (GNUNET_STREAM_OK == status) &&
(size == sc->reply_size) )
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted %u byte reply via stream\n",
+ (unsigned int) size);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Blocks transferred via stream"), 1,
GNUNET_NO);
continue_reading (sc);
}
else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of reply failed, terminating stream\n");
terminate_stream (sc);
+ }
}
continue_reading (sc);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting transmission of %u byte reply via stream\n",
+ (unsigned int) size);
srm->header.size = htons ((uint16_t) msize);
srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
srm->type = htonl (type);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# queries received via stream"), 1,
GNUNET_NO);
+ refresh_timeout_task (sc);
sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
0,
&sqm->query,
if (NULL == socket)
return GNUNET_SYSERR;
+ if (sc_count >= sc_count_max)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# stream client connections rejected"), 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Accepting inbound stream connection from `%s'\n",
+ GNUNET_i2s (initiator));
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# stream connections active"), 1,
GNUNET_NO);
GNUNET_CONTAINER_DLL_insert (sc_head,
sc_tail,
sc);
+ sc_count++;
+ refresh_timeout_task (sc);
return GNUNET_OK;
}
GSF_stream_start ()
{
stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
- listen_socket = GNUNET_STREAM_listen (GSF_cfg,
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- &accept_cb, NULL,
- GNUNET_STREAM_OPTION_END);
+ if (GNUNET_YES ==
+ GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+ "fs",
+ "MAX_STREAM_CLIENTS",
+ &sc_count_max))
+ {
+ listen_socket = GNUNET_STREAM_listen (GSF_cfg,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ &accept_cb, NULL,
+ GNUNET_STREAM_OPTION_END);
+ }
}