static unsigned int default_timeout = 10;
+/**
+ * Function to print the contents of an address location. Used only for debugging
+ *
+ * @param ptr the address location; Should be more than 5 bytes long
+ */
+static void
+debug_print_contents (const void *ptr)
+{
+ /* const char *c; */
+
+ /* c = ptr; */
+ /* LOG (GNUNET_ERROR_TYPE_DEBUG, */
+ /* "--- contents: %u %u %u %u %u\n", c[0], c[1], c[2], c[3], c[4]); */
+}
+
+
/**
* Callback function for sending queued message
*
"%s: Placing DATA message with sequence %u in send queue\n",
GNUNET_i2s (&socket->other_peer),
ntohl (io_handle->messages[packet]->sequence_number));
+ debug_print_contents(&(io_handle->messages[packet][1]));
copy_and_queue_message (socket,
&io_handle->messages[packet]->header,
NULL,
"%s: Placing DATA message with sequence %u in send queue\n",
GNUNET_i2s (&socket->other_peer),
ntohl (io_handle->messages[packet]->sequence_number));
+ debug_print_contents(&(io_handle->messages[packet][1]));
copy_and_queue_message (socket,
&io_handle->messages[packet]->header,
NULL,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Calling read processor\n",
GNUNET_i2s (&socket->other_peer));
+ debug_print_contents (socket->receive_buffer + socket->copy_offset);
read_size =
socket->read_handle->proc (socket->read_handle->proc_cls,
socket->status,
GNUNET_i2s (&socket->other_peer), sequence_increase);
/* Shift the data in the receive buffer */
- memmove (socket->receive_buffer,
- socket->receive_buffer
- + socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size
- - socket->receive_buffer_boundaries[sequence_increase-1]);
+ socket->receive_buffer =
+ memmove (socket->receive_buffer,
+ socket->receive_buffer
+ + socket->receive_buffer_boundaries[sequence_increase-1],
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
/* Shift the bitmap */
socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
/* Set read_sequence_number */
{
if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
{
- socket->receive_buffer_boundaries[packet] =
- socket->receive_buffer_boundaries[packet + sequence_increase]
- - offset_increase;
+ uint32_t ahead_buffer_boundary;
+
+ ahead_buffer_boundary =
+ socket->receive_buffer_boundaries[packet + sequence_increase];
+ if (0 == ahead_buffer_boundary)
+ socket->receive_buffer_boundaries[packet] = 0;
+ else
+ {
+ GNUNET_assert (offset_increase < ahead_buffer_boundary);
+ socket->receive_buffer_boundaries[packet] =
+ ahead_buffer_boundary - offset_increase;
+ }
}
else
socket->receive_buffer_boundaries[packet] = 0;
/* Copy Data to buffer */
payload = &msg[1];
+ debug_print_contents(payload);
GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
memcpy (socket->receive_buffer + relative_offset,
payload,
}
else /* We have to call the write continuation callback now */
{
+ struct GNUNET_STREAM_IOWriteHandle *write_handle;
+
/* Free the packets */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
GNUNET_free_non_null (socket->write_handle->messages[packet]);
}
- if (NULL != socket->write_handle->write_cont)
- socket->write_handle->write_cont
- (socket->write_handle->write_cont_cls,
- socket->status,
- socket->write_handle->size);
+ write_handle = socket->write_handle;
+ socket->write_handle = NULL;
+ if (NULL != write_handle->write_cont)
+ write_handle->write_cont (write_handle->write_cont_cls,
+ socket->status,
+ write_handle->size);
+ /* We are done with the write handle - Freeing it */
+ GNUNET_free (write_handle);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Write completion callback completed\n",
- GNUNET_i2s (&socket->other_peer));
- /* We are done with the write handle - Freeing it */
- GNUNET_free (socket->write_handle);
- socket->write_handle = NULL;
+ GNUNET_i2s (&socket->other_peer));
}
break;
default:
GNUNET_STREAM_close (peer1.socket);
if (NULL != peer2.socket)
GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
if (0 != abort_task)
{
{
peer->io_write_handle =
GNUNET_STREAM_write (peer->socket,
- (void *) data,
+ ((void *) data) + peer->bytes_wrote,
DATA_SIZE - peer->bytes_wrote,
GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 5),
}
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/**
* Input processor
*
- * @param cls the closure from GNUNET_STREAM_write/read
+ * @param cls peer2
* @param status the status of the stream at the time this function is called
* @param data traffic from the other side
* @param size the number of bytes available in data read
struct PeerData *peer = cls;
GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (&peer2 == peer);
GNUNET_assert (size < DATA_SIZE);
- GNUNET_assert (memcmp (data + peer->bytes_read,
- input_data,
- size));
+ GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read,
+ input_data, size));
peer->bytes_read += size;
if (peer->bytes_read < DATA_SIZE)
{
- peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL != peer->io_read_handle);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */
+ /* peer->socket, */
+ /* GNUNET_TIME_relative_multiply */
+ /* (GNUNET_TIME_UNIT_SECONDS, 5), */
+ /* &input_processor, */
+ /* cls); */
+ /* GNUNET_assert (NULL != peer->io_read_handle); */
}
else
{
stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct PeerData *peer = cls;
read_task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_assert (socket == peer2.socket);
- peer2.bytes_read = 0;
- GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
- peer2.io_read_handle =
- GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+ GNUNET_assert (&peer2 == peer);
+ peer->io_read_handle =
+ GNUNET_STREAM_read (peer->socket,
GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 10),
&input_processor,
- cls);
- GNUNET_assert (NULL != peer2.io_read_handle);
+ peer);
+ GNUNET_assert (NULL != peer->io_read_handle);
}
"Peer connected: %s\n", GNUNET_i2s(initiator));
peer2.socket = socket;
- read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, socket);
+ peer2.bytes_read = 0;
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
return GNUNET_OK;
}
GNUNET_assert (NULL != peer1.socket);
}
+
/**
* Initialize framework and start test
*/
abort_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort,
+ (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
test_task = GNUNET_SCHEDULER_add_now (&test, NULL);