-ensure that either stream_api calls callbacks last or that we don't destroy a stream...
authorChristian Grothoff <christian@grothoff.org>
Sat, 24 Nov 2012 22:17:15 +0000 (22:17 +0000)
committerChristian Grothoff <christian@grothoff.org>
Sat, 24 Nov 2012 22:17:15 +0000 (22:17 +0000)
src/fs/gnunet-service-fs_stream.c
src/include/gnunet_stream_lib.h
src/stream/stream_api.c

index 856a21a1a611366ecf6a4e3796da55b4528b7934..b444e282ccf43b39f7726df351cc73b63b80c4a5 100644 (file)
@@ -25,7 +25,6 @@
  *
  * TODO:
  * - limit # concurrent clients, have timeouts for server-side
- * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!)
  */
 #include "platform.h"
 #include "gnunet_constants.h"
@@ -77,6 +76,11 @@ struct StreamClient
    */
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
+  /**
+   * Task that is scheduled to asynchronously terminate the connection.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
   /**
    * Size of the last write that was initiated.
    */ 
@@ -247,6 +251,13 @@ struct StreamHandle
    */
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
+  /**
+   * Task to reset streams that had errors (asynchronously,
+   * as we may not be able to do it immediately during a
+   * callback from the stream API).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
   /**
    * Is this stream ready for transmission?
    */
@@ -377,6 +388,55 @@ reset_stream (struct StreamHandle *sh)
 }
 
 
+/**
+ * Task called when it is time to destroy an inactive stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+stream_timeout (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamHandle *sh = cls;
+
+  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  destroy_stream_handle (sh);
+}
+
+
+/**
+ * Task called when it is time to reset an stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_stream_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamHandle *sh = cls;
+
+  sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+  reset_stream (sh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch,
+ * but do so asynchronously.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream_async (struct StreamHandle *sh)
+{
+  if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
+    sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+                                              sh);
+}
+
+
 /**
  * We got a reply from the stream.  Process it.
  *
@@ -403,7 +463,7 @@ handle_stream_reply (void *cls,
                                 GNUNET_NO, GNUNET_NO))
   {
     GNUNET_break_op (0);
-    reset_stream (sh);
+    reset_stream_async (sh);
     return size;
   }
   sh->rh = GNUNET_STREAM_read (sh->stream,
@@ -513,6 +573,7 @@ reply_cb (void *cls,
     if (sizeof (struct StreamReplyMessage) > msize)
     {
       GNUNET_break_op (0);
+      reset_stream_async (sh);
       return GNUNET_SYSERR;
     }
     srm = (const struct StreamReplyMessage *) message;
@@ -523,7 +584,8 @@ reply_cb (void *cls,
                              type,
                              &srm[1], msize, &query))
     {
-      GNUNET_break_op (0);
+      GNUNET_break_op (0); 
+      reset_stream_async (sh);
       return GNUNET_SYSERR;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -553,6 +615,7 @@ reply_cb (void *cls,
     return GNUNET_OK;
   default:
     GNUNET_break_op (0);
+    reset_stream_async (sh);
     return GNUNET_SYSERR;
   }
 }
@@ -632,23 +695,6 @@ GSF_stream_query (const struct GNUNET_PeerIdentity *target,
 }
 
 
-/**
- * Task called when it is time to destroy an inactive stream.
- *
- * @param cls the 'struct StreamHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-stream_timeout (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct StreamHandle *sh = cls;
-
-  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  destroy_stream_handle (sh);
-}
-
-
 /**
  * Cancel an active request; must not be called after 'proc'
  * was calld.
@@ -691,7 +737,9 @@ terminate_stream (struct StreamClient *sc)
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# stream connections active"), -1,
                            GNUNET_NO);
-  if (NULL != sc->rh)
+  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+ if (NULL != sc->rh)
     GNUNET_STREAM_io_read_cancel (sc->rh);
   if (NULL != sc->wh)
     GNUNET_STREAM_io_write_cancel (sc->wh);
@@ -706,6 +754,38 @@ terminate_stream (struct StreamClient *sc)
 }
 
 
+/**
+ * Task run to asynchronously terminate the stream.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */ 
+static void
+terminate_stream_task (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamClient *sc = cls;
+
+  sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
+  terminate_stream (sc);
+}
+
+
+/**
+ * We had a serious error, termiante stream,
+ * but do so asynchronously.
+ *
+ * @param sc stream to reset
+ */
+static void
+terminate_stream_async (struct StreamClient *sc)
+{
+  if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
+    sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
+                                                  sc);
+}
+
+
 /**
  * Functions of this signature are called whenever data is available from the
  * stream.
@@ -782,7 +862,7 @@ process_request (void *cls,
     if (GNUNET_SYSERR == ret)
     {
       GNUNET_break_op (0);
-      terminate_stream (sc);
+      terminate_stream_async (sc);
       return size;
     }
     break;
@@ -790,7 +870,7 @@ process_request (void *cls,
   case GNUNET_STREAM_SHUTDOWN:
   case GNUNET_STREAM_SYSERR:
   case GNUNET_STREAM_BROKEN:
-    terminate_stream (sc);
+    terminate_stream_async (sc);
     return size;
   default:
     GNUNET_break (0);
@@ -922,6 +1002,7 @@ request_cb (void *cls,
        ntohs (message->size))
     {
       GNUNET_break_op (0);
+      terminate_stream_async (sc);
       return GNUNET_SYSERR;
     }
     sqm = (const struct StreamQueryMessage *) message;
@@ -944,6 +1025,7 @@ request_cb (void *cls,
     return GNUNET_OK;
   default:
     GNUNET_break_op (0);
+    terminate_stream_async (sc);
     return GNUNET_SYSERR;
   }
 }
index b7e3e4ce27bfb53c82e615682208024f0dd263d1..d8121836496e10451eda9ba1f71773617034e3cc 100644 (file)
@@ -246,12 +246,10 @@ struct GNUNET_STREAM_ListenSocket;
  * Listens for stream connections for a specific application ports
  *
  * @param cfg the configuration to use
- *
  * @param app_port the application port for which new streams will be
  *         accepted. If another stream is listening on the same port the
  *         listen_cb will be called to signal binding error and the returned
  *         ListenSocket will be invalidated.
- *
  * @param listen_cb this function will be called when a peer tries to establish
  *            a stream with us
  * @param listen_cb_cls closure for listen_cb
index 46f7abb4764da33a599b4b65a6419483321c5f52..f9152749df22564249b9f19778d2ddef83b7a77a 100644 (file)
@@ -1917,13 +1917,16 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
      that that stream has been shutdown */
   if (NULL != socket->write_handle)
   {
-    // FIXME: this breaks if 'write_cont' decides to 
-    // call SOCKET_close!
-    if (NULL != socket->write_handle->write_cont)
-      socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
-                                        GNUNET_STREAM_SHUTDOWN, 0);
+    GNUNET_STREAM_CompletionContinuation wc;
+    void *wc_cls;
+
+    wc = socket->write_handle->write_cont;
+    wc_cls = socket->write_handle->write_cont_cls;
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     socket->write_handle = NULL;
+    if (NULL != wc)
+      wc (wc_cls,
+         GNUNET_STREAM_SHUTDOWN, 0);
   }
   return GNUNET_OK;
 }
@@ -2041,13 +2044,16 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
      that that stream has been shutdown */
   if (NULL != socket->write_handle)
   {
-    // FIXME: this breaks if 'write_cont' decides to 
-    // call SOCKET_close!
-    if (NULL != socket->write_handle->write_cont)
-      socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
-                                        GNUNET_STREAM_SHUTDOWN, 0);
+    GNUNET_STREAM_CompletionContinuation wc;
+    void *wc_cls;
+
+    wc = socket->write_handle->write_cont;
+    wc_cls = socket->write_handle->write_cont_cls;
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     socket->write_handle = NULL;
+    if (NULL != wc)
+      wc (wc_cls,
+         GNUNET_STREAM_SHUTDOWN, 0);
   }
   return GNUNET_OK;
 }