From: Sree Harsha Totakura Date: Wed, 21 Mar 2012 12:32:55 +0000 (+0000) Subject: fixed read timeout problem and added ack sending incase of ignored data messages X-Git-Tag: initial-import-from-subversion-38251~14176 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=e83f713b6cd9795ca576402682448f0c54879331;p=oweals%2Fgnunet.git fixed read timeout problem and added ack sending incase of ignored data messages --- diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 1547f0228..2b9363c68 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -845,11 +845,18 @@ call_read_processor (void *cls, socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; /* Call the data processor */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Calling read processor\n", + socket->our_id); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, socket->receive_buffer + socket->copy_offset, valid_read_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Read processor read %d bytes\n", + socket->our_id, + read_size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%x: Read processor completed successfully\n", socket->our_id); @@ -917,17 +924,29 @@ read_io_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; + GNUNET_STREAM_DataProcessor proc; + void *proc_cls; socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Read task timedout - Cancelling it\n", + socket->our_id); GNUNET_SCHEDULER_cancel (socket->read_task_id); socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } GNUNET_assert (NULL != socket->read_handle); - + proc = socket->read_handle->proc; + proc_cls = socket->read_handle->proc_cls; + GNUNET_free (socket->read_handle); socket->read_handle = NULL; + /* Call the read processor to signal timeout */ + proc (proc_cls, + GNUNET_STREAM_TIMEOUT, + NULL, + 0); } @@ -986,9 +1005,18 @@ handle_data (struct GNUNET_STREAM_Socket *socket, "%x: Ignoring received message with sequence number %u\n", socket->our_id, ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } return GNUNET_YES; } - + /* Check if we have already seen this message */ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, relative_sequence_number)) @@ -998,6 +1026,15 @@ handle_data (struct GNUNET_STREAM_Socket *socket, "number %u\n", socket->our_id, ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } return GNUNET_YES; } @@ -1063,6 +1100,10 @@ handle_data (struct GNUNET_STREAM_Socket *socket, && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0))) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%x: Scheduling read processor\n", + socket->our_id); + socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, socket); @@ -1864,12 +1905,13 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, socket->our_id); return GNUNET_OK; } - + /* FIXME: increment in the base sequence number is breaking current flow + */ if (!((socket->write_sequence_number - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received DATA_ACK with unexpected base sequence", + "%x: Received DATA_ACK with unexpected base sequence " "number\n", socket->our_id); return GNUNET_OK; @@ -2532,14 +2574,19 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_IOReadHandle *read_handle; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s()\n", __func__); + "%x: %s()\n", + socket->our_id, + __func__); /* Return NULL if there is already a read handle; the user has to cancel that first before continuing or has to wait until it is completed */ if (NULL != socket->read_handle) return NULL; + GNUNET_assert (NULL != proc); + read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); read_handle->proc = proc; + read_handle->proc_cls = proc_cls; socket->read_handle = read_handle; /* Check if we have a packet at bitmap 0 */ @@ -2556,7 +2603,9 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, &read_io_timeout, socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", __func__); + "%x: %s() END\n", + socket->our_id, + __func__); return read_handle; } @@ -2569,6 +2618,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, void GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) { + /* FIXME: Should cancel the write retransmission task */ return; } diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 9a6c13da6..535ee62a2 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -287,11 +287,26 @@ input_processor (void *cls, peer = (struct PeerData *) cls; + if (GNUNET_STREAM_TIMEOUT == status) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Read operation timedout - reading again!\n"); + GNUNET_assert (0 == size); + peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) + peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + cls); + GNUNET_assert (NULL != peer->io_read_handle); + return 0; + } + GNUNET_assert (GNUNET_STREAM_OK == status); - GNUNET_assert (size < strlen (data)); - GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, - (const char *) input_data, - size)); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); peer->bytes_read += size; if (peer->bytes_read < strlen (data))