* Task scheduled to continue a read operation.
*/
GNUNET_SCHEDULER_TaskIdentifier read_task_id;
+
+ /**
+ * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
+ * the read processor task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
};
}
+/**
+ * Cleansup the sockets read handle
+ *
+ * @param socket the socket whose read handle has to be cleanedup
+ */
+static void
+cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+
+ read_handle = socket->read_handle;
+ /* Read io time task should be there; if it is already executed then this
+ read handle is not valid; However upon scheduler shutdown the read io task
+ may be executed before */
+ if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
+ GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+ /* reading task may be present; if so we have to stop it */
+ if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
+ GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
+ GNUNET_free (read_handle);
+ socket->read_handle = NULL;
+}
+
+
/**
* Task for calling the read processor
*
{
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_IOReadHandle *read_handle;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
size_t read_size;
size_t valid_read_size;
unsigned int packet;
return;
if (NULL == socket->receive_buffer)
return;
- GNUNET_assert (NULL != socket->read_handle);
- GNUNET_assert (NULL != socket->read_handle->proc);
+ GNUNET_assert (NULL != read_handle->proc);
/* Check the bitmap for any holes */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
valid_read_size =
socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
GNUNET_assert (0 != valid_read_size);
- /* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
- read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ cleanup_read_handle (socket);
/* Call the data processor */
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
GNUNET_i2s (&socket->other_peer));
- read_size =
- socket->read_handle->proc (socket->read_handle->proc_cls,
- socket->status,
- socket->receive_buffer + socket->copy_offset,
- valid_read_size);
+ read_size = proc (proc_cls, socket->status,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
GNUNET_i2s (&socket->other_peer), read_size);
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
GNUNET_i2s (&socket->other_peer));
- /* Free the read handle */
- GNUNET_free (socket->read_handle);
- socket->read_handle = NULL;
GNUNET_assert (read_size <= valid_read_size);
socket->copy_offset += read_size;
/* Determine upto which packet we can remove from the buffer */
/**
- * Tries to read data from the stream.
+ * Function to check the ACK bitmap for any received messages and call the data processor
+ *
+ * @param cls the socket
+ * @param tc the scheduler task context
+ */
+static void
+probe_data_availability (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ GNUNET_assert (NULL != socket->read_handle);
+ socket->read_handle->probe_data_availability_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
+ return; /* A task to call read processor is present */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ 0))
+ socket->read_handle->read_task_id
+ = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
+}
+
+
+
+/**
+ * Tries to read data from the stream. Should not be called when another read
+ * handle is present; the existing read handle should be canceled with
+ * GNUNET_STREAM_io_read_cancel(). Only one read handle per socket is present at any time
*
* @param socket the socket representing a stream
* @param timeout the timeout period
first before continuing or has to wait until it is completed */
if (NULL != socket->read_handle)
{
- GNUNET_break (0);
+ GNUNET_assert (0);
return NULL;
}
GNUNET_assert (NULL != proc);
read_handle->proc_cls = proc_cls;
read_handle->socket = socket;
socket->read_handle = read_handle;
- if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
- 0))
- read_handle->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
+ read_handle->probe_data_availability_task_id =
+ GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
read_handle->read_io_timeout_task_id =
GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
socket = ioh->socket;
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (ioh == socket->read_handle);
- /* Read io time task should be there; if it is already executed then this
- read handle is not valid; However upon scheduler shutdown the read io task
- may be executed before */
- if (GNUNET_SCHEDULER_NO_TASK != ioh->read_io_timeout_task_id)
- GNUNET_SCHEDULER_cancel (ioh->read_io_timeout_task_id);
- /* reading task may be present; if so we have to stop it */
- if (GNUNET_SCHEDULER_NO_TASK != ioh->read_task_id)
- GNUNET_SCHEDULER_cancel (ioh->read_task_id);
- GNUNET_free (ioh);
- socket->read_handle = NULL;
+ cleanup_read_handle (socket);
}
/* end of stream_api.c */