-options to play with
[oweals/gnunet.git] / src / fs / gnunet-service-fs_stream.c
index b444e282ccf43b39f7726df351cc73b63b80c4a5..4daf21e448f03cf4f817b8c70b3ff1d6b77adfb5 100644 (file)
@@ -22,9 +22,6 @@
  * @file fs/gnunet-service-fs_stream.c
  * @brief non-anonymous file-transfer
  * @author Christian Grothoff
- *
- * TODO:
- * - limit # concurrent clients, have timeouts for server-side
  */
 #include "platform.h"
 #include "gnunet_constants.h"
 #include "gnunet-service-fs_indexing.h"
 #include "gnunet-service-fs_stream.h"
 
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
+
+
 /**
  * Information we keep around for each active streaming client.
  */
@@ -81,6 +84,11 @@ struct StreamClient
    */
   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
 
+  /**
+   * Task that is scheduled to terminate idle connections.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
   /**
    * Size of the last write that was initiated.
    */ 
@@ -210,14 +218,10 @@ struct StreamHandle
   struct GSF_StreamRequest *pending_tail;
 
   /**
-   * Head of DLL of requests waiting for a reply on this stream.
-   */
-  struct GSF_StreamRequest *waiting_head;
-
-  /**
-   * Tail of DLL of requests waiting for a reply on this stream.
+   * Map from query to 'struct GSF_StreamRequest's waiting for
+   * a reply.
    */
-  struct GSF_StreamRequest *waiting_tail;
+  struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
 
   /**
    * Connection to the other peer.
@@ -281,6 +285,16 @@ static struct StreamClient *sc_head;
  */ 
 static struct StreamClient *sc_tail;
 
+/**
+ * Number of active stream clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of stream clients.
+ */
+static unsigned long long sc_count_max;
+
 /**
  * Map from peer identities to 'struct StreamHandles' with streams to
  * those peers.
@@ -290,6 +304,30 @@ static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
 
 /* ********************* client-side code ************************* */
 
+/**
+ * Iterator called on each entry in a waiting map to 
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct GSF_StreamRequest *sr = value;
+
+  sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+           GNUNET_TIME_UNIT_FOREVER_ABS,
+           0, NULL);
+  GSF_stream_query_cancel (sr);
+  return GNUNET_YES;
+}
+
 
 /**
  * Destroy a stream handle.
@@ -308,13 +346,9 @@ destroy_stream_handle (struct StreamHandle *sh)
              0, NULL);
     GSF_stream_query_cancel (sr);
   }
-  while (NULL != (sr = sh->waiting_head))
-  {
-    sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
-             GNUNET_TIME_UNIT_FOREVER_ABS,
-             0, NULL);
-    GSF_stream_query_cancel (sr);
-  }
+  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+                                        &free_waiting_entry,
+                                        sh);
   if (NULL != sh->wh)
     GNUNET_STREAM_io_write_cancel (sh->wh);
   if (NULL != sh->rh)
@@ -326,6 +360,7 @@ destroy_stream_handle (struct StreamHandle *sh)
                 GNUNET_CONTAINER_multihashmap_remove (stream_map,
                                                       &sh->target.hashPubKey,
                                                       sh));
+  GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
   GNUNET_free (sh);
 }
 
@@ -356,6 +391,35 @@ stream_ready_cb (void *cls,
 }
 
 
+/**
+ * Iterator called on each entry in a waiting map to 
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+                const struct GNUNET_HashCode *key,
+                void *value)
+{
+  struct StreamHandle *sh = cls;
+  struct GSF_StreamRequest *sr = value;
+  
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+                                                      key,
+                                                      value));
+  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+                              sh->pending_tail,
+                              sr);
+  sr->was_transmitted = GNUNET_NO;
+  return GNUNET_YES;
+}
+
+
 /**
  * We had a serious error, tear down and re-create stream from scratch.
  *
@@ -364,22 +428,16 @@ stream_ready_cb (void *cls,
 static void
 reset_stream (struct StreamHandle *sh)
 {
-  struct GSF_StreamRequest *sr;
-  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Resetting stream to %s\n",
+             GNUNET_i2s (&sh->target));
   if (NULL != sh->rh)
     GNUNET_STREAM_io_read_cancel (sh->rh);
   GNUNET_STREAM_close (sh->stream);
   sh->is_ready = GNUNET_NO;
-  while (NULL != (sr = sh->waiting_tail))
-  {
-    GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
-                                sh->waiting_tail,
-                                sr);
-    GNUNET_CONTAINER_DLL_insert (sh->pending_head,
-                                sh->pending_tail,
-                                sr);
-    sr->was_transmitted = GNUNET_NO;
-  }
+  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+                                        &move_to_pending,
+                                        sh);
   sh->stream = GNUNET_STREAM_open (GSF_cfg,
                                   &sh->target,
                                   GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
@@ -400,6 +458,9 @@ stream_timeout (void *cls,
 {
   struct StreamHandle *sh = cls;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Timeout on stream to %s\n",
+             GNUNET_i2s (&sh->target));
   sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
   destroy_stream_handle (sh);
 }
@@ -456,6 +517,10 @@ handle_stream_reply (void *cls,
   struct StreamHandle *sh = cls;
 
   sh->rh = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received %u bytes from stream to %s\n",
+             (unsigned int) size,
+             GNUNET_i2s (&sh->target));
   if (GNUNET_SYSERR == 
       GNUNET_SERVER_mst_receive (sh->mst,
                                 NULL,
@@ -499,6 +564,10 @@ query_write_continuation (void *cls,
     reset_stream (sh);
     return;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Successfully transmitted %u bytes via stream to %s\n",
+             (unsigned int) size,
+             GNUNET_i2s (&sh->target));
   if (NULL == sh->rh)
     sh->rh = GNUNET_STREAM_read (sh->stream,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -527,9 +596,13 @@ transmit_pending (struct StreamHandle *sh)
   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
                               sh->pending_tail,
                               sr);
-  GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
-                                   sh->waiting_tail,
-                                   sr);
+  GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
+                                    &sr->query,
+                                    sr,
+                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending query via stream to %s\n",
+             GNUNET_i2s (&sh->target));
   sr->was_transmitted = GNUNET_YES;
   sqm.header.size = htons (sizeof (sqm));
   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
@@ -543,6 +616,67 @@ transmit_pending (struct StreamHandle *sh)
 }
 
 
+/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+  /**
+   * Reply payload.
+   */ 
+  const void *data;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Number of bytes in 'data'.
+   */
+  size_t data_size;
+
+  /** 
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+  
+  /**
+   * Did we have a matching query?
+   */
+  int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+             const struct GNUNET_HashCode *key,
+             void *value)
+{
+  struct HandleReplyClosure *hrc = cls;
+  struct GSF_StreamRequest *sr = value;
+  
+  sr->proc (sr->proc_cls,
+           hrc->type,
+           hrc->expiration,
+           hrc->data_size,
+           hrc->data);
+  GSF_stream_query_cancel (sr);
+  hrc->found = GNUNET_YES;
+  return GNUNET_YES;
+}
+
+
 /**
  * Functions with this signature are called whenever a
  * complete reply is received.
@@ -561,10 +695,10 @@ reply_cb (void *cls,
 {
   struct StreamHandle *sh = cls;
   const struct StreamReplyMessage *srm;
+  struct HandleReplyClosure hrc;
   uint16_t msize;
   enum GNUNET_BLOCK_Type type;
   struct GNUNET_HashCode query;
-  struct GSF_StreamRequest *sr;
 
   msize = ntohs (message->size);
   switch (ntohs (message->type))
@@ -594,24 +728,22 @@ reply_cb (void *cls,
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# replies received via stream"), 1,
                              GNUNET_NO);
-    for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
-      if (0 == memcmp (&query,
-                      &sr->query,
-                      sizeof (struct GNUNET_HashCode)))
-       break;
-    if (NULL == sr)
+    hrc.data = &srm[1];
+    hrc.data_size = msize;
+    hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+    hrc.type = type;
+    hrc.found = GNUNET_NO;
+    GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+                                               &query,
+                                               &handle_reply,
+                                               &hrc);
+    if (GNUNET_NO == hrc.found)
     {
       GNUNET_STATISTICS_update (GSF_stats,
                                gettext_noop ("# replies received via stream dropped"), 1,
                                GNUNET_NO);
       return GNUNET_OK;
     }
-    sr->proc (sr->proc_cls,
-             type,
-             GNUNET_TIME_absolute_ntoh (srm->expiration),
-             msize,
-             &srm[1]);
-    GSF_stream_query_cancel (sr);
     return GNUNET_OK;
   default:
     GNUNET_break_op (0);
@@ -642,9 +774,13 @@ get_stream (const struct GNUNET_PeerIdentity *target)
     }
     return sh;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Creating stream to %s\n",
+             GNUNET_i2s (target));
   sh = GNUNET_malloc (sizeof (struct StreamHandle));
   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
                                      sh);
+  sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
   sh->target = *target;
   sh->stream = GNUNET_STREAM_open (GSF_cfg,
                                   &sh->target,
@@ -679,6 +815,10 @@ GSF_stream_query (const struct GNUNET_PeerIdentity *target,
   struct StreamHandle *sh;
   struct GSF_StreamRequest *sr;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Preparing to send query for %s via stream to %s\n",
+             GNUNET_h2s (query),
+             GNUNET_i2s (target));
   sh = get_stream (target);
   sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
   sr->sh = sh;
@@ -707,15 +847,15 @@ GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
   struct StreamHandle *sh = sr->sh;
 
   if (GNUNET_YES == sr->was_transmitted)
-    GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
-                                sh->waiting_tail,
-                                sr);
+    GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+                                         &sr->query,
+                                         sr);
   else
     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
                                 sh->pending_tail,
                                 sr);
   GNUNET_free (sr);
-  if ( (NULL == sh->waiting_head) &&
+  if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
        (NULL == sh->pending_head) )
     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
                                                     &stream_timeout,
@@ -739,6 +879,8 @@ terminate_stream (struct StreamClient *sc)
                            GNUNET_NO);
   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
  if (NULL != sc->rh)
     GNUNET_STREAM_io_read_cancel (sc->rh);
   if (NULL != sc->wh)
@@ -750,6 +892,7 @@ terminate_stream (struct StreamClient *sc)
   GNUNET_CONTAINER_DLL_remove (sc_head,
                               sc_tail,
                               sc);
+  sc_count--;
   GNUNET_free (sc);
 }
 
@@ -771,6 +914,39 @@ terminate_stream_task (void *cls,
 }
 
 
+/**
+ * Task run to asynchronously terminate the stream due to timeout.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */ 
+static void
+timeout_stream_task (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamClient *sc = cls;
+
+  sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  terminate_stream (sc);
+}
+
+
+/**
+ * Reset the timeout for the stream client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct StreamClient *sc)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+                                                  &timeout_stream_task,
+                                                  sc);
+}
+
+
 /**
  * We had a serious error, termiante stream,
  * but do so asynchronously.
@@ -821,6 +997,7 @@ continue_reading (struct StreamClient *sc)
                               GNUNET_NO, GNUNET_YES);
   if (GNUNET_NO == ret)
     return; 
+  refresh_timeout_task (sc);
   sc->rh = GNUNET_STREAM_read (sc->socket,
                               GNUNET_TIME_UNIT_FOREVER_REL,
                               &process_request,
@@ -849,6 +1026,9 @@ process_request (void *cls,
   int ret;
 
   sc->rh = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received %u byte query via stream\n",
+             (unsigned int) size);
   switch (status)
   {
   case GNUNET_STREAM_OK:
@@ -885,6 +1065,8 @@ process_request (void *cls,
  * Sending a reply was completed, continue processing.
  *
  * @param cls closure with the struct StreamClient which sent the query
+ * @param status result code for the operation
+ * @param size number of bytes that were transmitted
  */
 static void
 write_continuation (void *cls,
@@ -897,13 +1079,20 @@ write_continuation (void *cls,
   if ( (GNUNET_STREAM_OK == status) &&
        (size == sc->reply_size) )
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Transmitted %u byte reply via stream\n",
+               (unsigned int) size);
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# Blocks transferred via stream"), 1,
                              GNUNET_NO);
     continue_reading (sc);
   }
   else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Transmission of reply failed, terminating stream\n");
     terminate_stream (sc);    
+  }
 }
 
 
@@ -957,6 +1146,9 @@ handle_datastore_reply (void *cls,
     continue_reading (sc);
     return;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Starting transmission of %u byte reply via stream\n",
+             (unsigned int) size);
   srm->header.size = htons ((uint16_t) msize);
   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
   srm->type = htonl (type);
@@ -1012,6 +1204,7 @@ request_cb (void *cls,
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# queries received via stream"), 1,
                              GNUNET_NO);
+    refresh_timeout_task (sc);
     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
                                       0,
                                       &sqm->query,
@@ -1052,6 +1245,16 @@ accept_cb (void *cls,
 
   if (NULL == socket)
     return GNUNET_SYSERR;
+  if (sc_count >= sc_count_max)
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                             gettext_noop ("# stream client connections rejected"), 1,
+                             GNUNET_NO);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Accepting inbound stream connection from `%s'\n",
+             GNUNET_i2s (initiator));
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# stream connections active"), 1,
                            GNUNET_NO);
@@ -1066,6 +1269,8 @@ accept_cb (void *cls,
   GNUNET_CONTAINER_DLL_insert (sc_head,
                               sc_tail,
                               sc);
+  sc_count++;
+  refresh_timeout_task (sc);
   return GNUNET_OK;
 }
 
@@ -1077,10 +1282,17 @@ void
 GSF_stream_start ()
 {
   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
-  listen_socket = GNUNET_STREAM_listen (GSF_cfg,
-                                       GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-                                       &accept_cb, NULL,
-                                       GNUNET_STREAM_OPTION_END);
+  if (GNUNET_YES ==
+      GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+                                            "fs",
+                                            "MAX_STREAM_CLIENTS",
+                                            &sc_count_max))
+  {
+    listen_socket = GNUNET_STREAM_listen (GSF_cfg,
+                                         GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+                                         &accept_cb, NULL,
+                                         GNUNET_STREAM_OPTION_END);
+  } 
 }