*
* TODO:
* - limit # concurrent clients, have timeouts for server-side
- * - stream shutdown in callbacks from stream may not always work right now (check with stream_api!)
*/
#include "platform.h"
#include "gnunet_constants.h"
*/
struct GNUNET_DATASTORE_QueueEntry *qe;
+ /**
+ * Task that is scheduled to asynchronously terminate the connection.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
/**
* Size of the last write that was initiated.
*/
*/
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+ /**
+ * Task to reset streams that had errors (asynchronously,
+ * as we may not be able to do it immediately during a
+ * callback from the stream API).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
/**
* Is this stream ready for transmission?
*/
}
+/**
+ * Task called when it is time to destroy an inactive stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+stream_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ destroy_stream_handle (sh);
+}
+
+
+/**
+ * Task called when it is time to reset an stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+ reset_stream (sh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch,
+ * but do so asynchronously.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream_async (struct StreamHandle *sh)
+{
+ if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
+ sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+ sh);
+}
+
+
/**
* We got a reply from the stream. Process it.
*
GNUNET_NO, GNUNET_NO))
{
GNUNET_break_op (0);
- reset_stream (sh);
+ reset_stream_async (sh);
return size;
}
sh->rh = GNUNET_STREAM_read (sh->stream,
if (sizeof (struct StreamReplyMessage) > msize)
{
GNUNET_break_op (0);
+ reset_stream_async (sh);
return GNUNET_SYSERR;
}
srm = (const struct StreamReplyMessage *) message;
type,
&srm[1], msize, &query))
{
- GNUNET_break_op (0);
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
return GNUNET_SYSERR;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
return GNUNET_OK;
default:
GNUNET_break_op (0);
+ reset_stream_async (sh);
return GNUNET_SYSERR;
}
}
}
-/**
- * Task called when it is time to destroy an inactive stream.
- *
- * @param cls the 'struct StreamHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-stream_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct StreamHandle *sh = cls;
-
- sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- destroy_stream_handle (sh);
-}
-
-
/**
* Cancel an active request; must not be called after 'proc'
* was calld.
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# stream connections active"), -1,
GNUNET_NO);
- if (NULL != sc->rh)
+ if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+ GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (NULL != sc->rh)
GNUNET_STREAM_io_read_cancel (sc->rh);
if (NULL != sc->wh)
GNUNET_STREAM_io_write_cancel (sc->wh);
}
+/**
+ * Task run to asynchronously terminate the stream.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */
+static void
+terminate_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamClient *sc = cls;
+
+ sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
+ terminate_stream (sc);
+}
+
+
+/**
+ * We had a serious error, termiante stream,
+ * but do so asynchronously.
+ *
+ * @param sc stream to reset
+ */
+static void
+terminate_stream_async (struct StreamClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
+ sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
+ sc);
+}
+
+
/**
* Functions of this signature are called whenever data is available from the
* stream.
if (GNUNET_SYSERR == ret)
{
GNUNET_break_op (0);
- terminate_stream (sc);
+ terminate_stream_async (sc);
return size;
}
break;
case GNUNET_STREAM_SHUTDOWN:
case GNUNET_STREAM_SYSERR:
case GNUNET_STREAM_BROKEN:
- terminate_stream (sc);
+ terminate_stream_async (sc);
return size;
default:
GNUNET_break (0);
ntohs (message->size))
{
GNUNET_break_op (0);
+ terminate_stream_async (sc);
return GNUNET_SYSERR;
}
sqm = (const struct StreamQueryMessage *) message;
return GNUNET_OK;
default:
GNUNET_break_op (0);
+ terminate_stream_async (sc);
return GNUNET_SYSERR;
}
}
that that stream has been shutdown */
if (NULL != socket->write_handle)
{
- // FIXME: this breaks if 'write_cont' decides to
- // call SOCKET_close!
- if (NULL != socket->write_handle->write_cont)
- socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
- GNUNET_STREAM_SHUTDOWN, 0);
+ GNUNET_STREAM_CompletionContinuation wc;
+ void *wc_cls;
+
+ wc = socket->write_handle->write_cont;
+ wc_cls = socket->write_handle->write_cont_cls;
GNUNET_STREAM_io_write_cancel (socket->write_handle);
socket->write_handle = NULL;
+ if (NULL != wc)
+ wc (wc_cls,
+ GNUNET_STREAM_SHUTDOWN, 0);
}
return GNUNET_OK;
}
that that stream has been shutdown */
if (NULL != socket->write_handle)
{
- // FIXME: this breaks if 'write_cont' decides to
- // call SOCKET_close!
- if (NULL != socket->write_handle->write_cont)
- socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
- GNUNET_STREAM_SHUTDOWN, 0);
+ GNUNET_STREAM_CompletionContinuation wc;
+ void *wc_cls;
+
+ wc = socket->write_handle->write_cont;
+ wc_cls = socket->write_handle->write_cont_cls;
GNUNET_STREAM_io_write_cancel (socket->write_handle);
socket->write_handle = NULL;
+ if (NULL != wc)
+ wc (wc_cls,
+ GNUNET_STREAM_SHUTDOWN, 0);
}
return GNUNET_OK;
}