+ while ( (NULL != io_handle->messages[packet]) &&
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)) &&
+ (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ {
+ socket->receiver_window_available -=
+ ntohs (io_handle->messages[packet]->header.header.size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send queue\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ packet++;
+ }
+ io_handle->packets_sent = packet;
+ // FIXME: 8s is not good, should use GNUNET_TIME_STD_BACKOFF...
+ if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+ socket->data_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 8),
+ &data_retransmission_task,
+ socket);
+}
+
+
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ * @param tc the task context
+ */
+static void
+call_read_processor (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+ size_t read_size;
+ size_t valid_read_size;
+ unsigned int packet;
+ uint32_t sequence_increase;
+ uint32_t offset_increase;
+
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+ if (NULL == socket->receive_buffer)
+ return;
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (NULL != socket->read_handle->proc);
+ /* Check the bitmap for any holes */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
+ /* We only call read processor if we have the first packet */
+ GNUNET_assert (0 < packet);
+ valid_read_size =
+ socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
+ GNUNET_assert (0 != valid_read_size);
+ /* Cancel the read_io_timeout_task */
+ GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ /* Call the data processor */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ read_size =
+ socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
+ GNUNET_i2s (&socket->other_peer), read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
+ GNUNET_i2s (&socket->other_peer));
+ /* Free the read handle */
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+ GNUNET_assert (read_size <= valid_read_size);
+ socket->copy_offset += read_size;
+ /* Determine upto which packet we can remove from the buffer */
+ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
+ { packet++; break; }
+ if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
+ break;
+ }
+ /* If no packets can be removed we can't move the buffer */
+ if (0 == packet)
+ return;
+ sequence_increase = packet;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Sequence increase after read processor completion: %u\n",
+ GNUNET_i2s (&socket->other_peer), sequence_increase);
+ /* Shift the data in the receive buffer */
+ socket->receive_buffer =
+ memmove (socket->receive_buffer,
+ socket->receive_buffer
+ + socket->receive_buffer_boundaries[sequence_increase-1],
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
+ /* Shift the bitmap */
+ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
+ /* Set read_sequence_number */
+ socket->read_sequence_number += sequence_increase;
+ /* Set read_offset */
+ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
+ socket->read_offset += offset_increase;
+ /* Fix copy_offset */
+ GNUNET_assert (offset_increase <= socket->copy_offset);
+ socket->copy_offset -= offset_increase;
+ /* Fix relative boundaries */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ {
+ uint32_t ahead_buffer_boundary;
+
+ ahead_buffer_boundary =
+ socket->receive_buffer_boundaries[packet + sequence_increase];
+ if (0 == ahead_buffer_boundary)
+ socket->receive_buffer_boundaries[packet] = 0;
+ else
+ {
+ GNUNET_assert (offset_increase < ahead_buffer_boundary);
+ socket->receive_buffer_boundaries[packet] =
+ ahead_buffer_boundary - offset_increase;
+ }