}
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ * @param tc the task context
+ */
+static void
+call_read_processor_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ size_t read_size;
+ size_t valid_read_size;
+
+ if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
+
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (NULL != socket->read_handle->proc);
+ GNUNET_assert (NULL != socket->copy_buffer);
+ GNUNET_assert (0 != socket->copy_buffer_size);
+
+ valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
+ GNUNET_assert (0 != valid_read_size);
+
+ read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->copy_buffer
+ + socket->copy_buffer_read_offset,
+ valid_read_size);
+
+ GNUNET_assert (read_size <= valid_read_size);
+ socket->copy_buffer_read_offset += read_size;
+
+ /* Free the copy buffer once it has been read entirely */
+ if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
+ {
+ GNUNET_free (socket->copy_buffer);
+ socket->copy_buffer = NULL;
+ socket->copy_buffer_size = 0;
+ socket->copy_buffer_read_offset = 0;
+ }
+
+ /* Free the read handle */
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+}
+
+
+/**
+ * Prepares the receive buffer for possible reads; Should only be called when
+ * there is a valid READ io request pending and socket->copy_buffer is empty
+ *
+ * @param socket the socket pointer
+ */
+static void
+prepare_buffer_for_read (struct GNUNET_STREAM_Socket *socket)
+{
+ unsigned int packet;
+ uint32_t offset_increase;
+ uint32_t sequence_increase;
+
+ GNUNET_assert (NULL == socket->copy_buffer);
+ GNUNET_assert (NULL != socket->read_handle);
+
+ /* Check the bitmap for any holes */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
+
+ sequence_increase = packet;
+
+ if (0 == sequence_increase) /* The first packet is still missing */
+ {
+ return;
+ }
+
+ /* Copy data to copy buffer */
+ GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]);
+ socket->copy_buffer =
+ GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]);
+ memcpy (socket->copy_buffer,
+ socket->receive_buffer,
+ socket->receive_buffer_boundaries[sequence_increase-1]);
+
+ /* Shift the data in the receive buffer */
+ memmove (socket->receive_buffer,
+ socket->receive_buffer
+ + socket->receive_buffer_boundaries[sequence_increase-1],
+ socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
+
+ /* Shift the bitmap */
+ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
+
+ /* Set read_sequence_number */
+ socket->read_sequence_number += sequence_increase;
+
+ /* Set read_offset */
+ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
+ socket->read_offset += offset_increase;
+
+ /* Fix relative boundaries */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ {
+ socket->receive_buffer_boundaries[packet] =
+ socket->receive_buffer_boundaries[packet + sequence_increase]
+ - offset_increase;
+ }
+ else
+ socket->receive_buffer_boundaries[packet] = 0;
+ }
+
+ GNUNET_SCHEDULER_add_continuation (&call_read_processor_task,
+ socket,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+}
+
+
/**
* Handler for DATA messages; Same for both client and server
*
&ack_task,
socket);
}
+
+ if ((NULL != socket->read_handle) /* A read handle is waiting */
+ && (NULL == socket->copy_buffer)) /* And the copy buffer is empty */
+ {
+ prepare_buffer_for_read (socket);
+ }
break;
}
-/**
- * Task for calling the read processor
- *
- * @param cls the socket
- */
-static void
-call_read_processor_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_STREAM_Socket *socket = cls;
- size_t read_size;
- size_t valid_read_size;
-
- if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
-
- GNUNET_assert (NULL != socket->read_handle);
- GNUNET_assert (NULL != socket->read_handle->proc);
- GNUNET_assert (NULL != socket->copy_buffer);
- GNUNET_assert (0 != socket->copy_buffer_size);
-
- valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
- GNUNET_assert (0 != valid_read_size);
-
- read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
- socket->status,
- socket->copy_buffer
- + socket->copy_buffer_read_offset,
- valid_read_size);
-
- GNUNET_assert (read_size <= valid_read_size);
- socket->copy_buffer_read_offset += read_size;
-
- /* Free the copy buffer once it has been read entirely */
- if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
- {
- GNUNET_free (socket->copy_buffer);
- socket->copy_buffer = NULL;
- socket->copy_buffer_size = 0;
- socket->copy_buffer_read_offset = 0;
- }
-
- /* Free the read handle */
- GNUNET_free (socket->read_handle);
- socket->read_handle = NULL;
-}
-
-
/*****************/
/* API functions */
/*****************/
GNUNET_STREAM_DataProcessor proc,
void *proc_cls)
{
- unsigned int packet;
struct GNUNET_STREAM_IOReadHandle *read_handle;
- uint32_t offset_increase;
- uint32_t sequence_increase;
/* Return NULL if there is already a read handle; the user has to cancel that
first before continuing or has to wait until it is completed */
/* if previous copy buffer is still not read call the data processor on it */
if (NULL != socket->copy_buffer)
- {
- GNUNET_SCHEDULER_add_now (&call_read_processor_task,
- socket);
- }
-
- /* Check the bitmap for any holes */
- for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
- packet))
- break;
- }
-
- sequence_increase = packet;
-
- if (0 == sequence_increase) /* The first packet is still missing */
- {
- /* We can't do anything until it arrives */
- }
+ GNUNET_SCHEDULER_add_continuation (&call_read_processor_task,
+ socket,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
else
- {
- /* Copy data to copy buffer */
- GNUNET_assert (0 < socket->receive_buffer_boundaries[sequence_increase-1]);
- socket->copy_buffer =
- GNUNET_malloc (socket->receive_buffer_boundaries[sequence_increase-1]);
-
- /* Shift the data in the receive buffer */
- memmove (socket->receive_buffer,
- socket->receive_buffer
- + socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
-
- /* Shift the bitmap */
- socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
-
- /* Set read_sequence_number */
- socket->read_sequence_number += sequence_increase;
-
- /* Set read_offset */
- offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
- socket->read_offset += offset_increase;
-
- /* Fix relative boundaries */
- for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
- {
- socket->receive_buffer_boundaries[packet] =
- socket->receive_buffer_boundaries[packet + sequence_increase]
- - offset_increase;
- }
- else
- socket->receive_buffer_boundaries[packet] = 0;
- }
- }
+ prepare_buffer_for_read (socket);
return read_handle;
}