socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Calling read processor\n",
+ socket->our_id);
read_size =
socket->read_handle->proc (socket->read_handle->proc_cls,
socket->status,
socket->receive_buffer + socket->copy_offset,
valid_read_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Read processor read %d bytes\n",
+ socket->our_id,
+ read_size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%x: Read processor completed successfully\n",
socket->our_id);
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Read task timedout - Cancelling it\n",
+ socket->our_id);
GNUNET_SCHEDULER_cancel (socket->read_task_id);
socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_assert (NULL != socket->read_handle);
-
+ proc = socket->read_handle->proc;
+ proc_cls = socket->read_handle->proc_cls;
+
GNUNET_free (socket->read_handle);
socket->read_handle = NULL;
+ /* Call the read processor to signal timeout */
+ proc (proc_cls,
+ GNUNET_STREAM_TIMEOUT,
+ NULL,
+ 0);
}
"%x: Ignoring received message with sequence number %u\n",
socket->our_id,
ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
return GNUNET_YES;
}
-
+
/* Check if we have already seen this message */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
relative_sequence_number))
"number %u\n",
socket->our_id,
ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
return GNUNET_YES;
}
&& (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
0)))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Scheduling read processor\n",
+ socket->our_id);
+
socket->read_task_id =
GNUNET_SCHEDULER_add_now (&call_read_processor,
socket);
socket->our_id);
return GNUNET_OK;
}
-
+ /* FIXME: increment in the base sequence number is breaking current flow
+ */
if (!((socket->write_sequence_number
- htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received DATA_ACK with unexpected base sequence",
+ "%x: Received DATA_ACK with unexpected base sequence "
"number\n",
socket->our_id);
return GNUNET_OK;
struct GNUNET_STREAM_IOReadHandle *read_handle;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s()\n", __func__);
+ "%x: %s()\n",
+ socket->our_id,
+ __func__);
/* Return NULL if there is already a read handle; the user has to cancel that
first before continuing or has to wait until it is completed */
if (NULL != socket->read_handle) return NULL;
+ GNUNET_assert (NULL != proc);
+
read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
read_handle->proc = proc;
+ read_handle->proc_cls = proc_cls;
socket->read_handle = read_handle;
/* Check if we have a packet at bitmap 0 */
&read_io_timeout,
socket);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s() END\n", __func__);
+ "%x: %s() END\n",
+ socket->our_id,
+ __func__);
return read_handle;
}
void
GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
{
+ /* FIXME: Should cancel the write retransmission task */
return;
}
peer = (struct PeerData *) cls;
+ if (GNUNET_STREAM_TIMEOUT == status)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Read operation timedout - reading again!\n");
+ GNUNET_assert (0 == size);
+ peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL != peer->io_read_handle);
+ return 0;
+ }
+
GNUNET_assert (GNUNET_STREAM_OK == status);
- GNUNET_assert (size < strlen (data));
- GNUNET_assert (strncmp ((const char *) data + peer->bytes_read,
- (const char *) input_data,
- size));
+ GNUNET_assert (size <= strlen (data));
+ GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
+ (const char *) input_data,
+ size));
peer->bytes_read += size;
if (peer->bytes_read < strlen (data))