From b1bf8f35ca652ab6371760d7e38d62a1e1bdee9b Mon Sep 17 00:00:00 2001 From: Sree Harsha Totakura Date: Fri, 11 May 2012 08:18:06 +0000 Subject: [PATCH] logging and indentation --- src/stream/stream_api.c | 1544 ++++++++++++++++++++------------------- 1 file changed, 773 insertions(+), 771 deletions(-) diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 3bf3c7863..e8ba24966 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -45,6 +45,8 @@ #include "gnunet_testing_lib.h" #include "stream_protocol.h" +#define LOG(kind,...) \ + GNUNET_log_from (kind, "stream-api", __VA_ARGS__) /** * The maximum packet size of a stream packet @@ -494,31 +496,31 @@ send_message_notify (void *cls, size_t size, void *buf) return 0; /* just to be safe */ GNUNET_PEER_resolve (socket->other_peer, &target); if (0 == size) /* request timed out */ - { - socket->retries++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message sending timed out. Retry %d \n", - socket->retries); - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &target, - ntohs (head->message->header.size), - &send_message_notify, - socket); - return 0; - } + { + socket->retries++; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message sending timed out. Retry %d \n", + socket->retries); + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &target, + ntohs (head->message->header.size), + &send_message_notify, + socket); + return 0; + } ret = ntohs (head->message->header.size); GNUNET_assert (size >= ret); memcpy (buf, head->message, ret); if (NULL != head->finish_cb) - { - head->finish_cb (head->finish_cb_cls, socket); - } + { + head->finish_cb (head->finish_cb_cls, socket); + } GNUNET_CONTAINER_DLL_remove (socket->queue_head, socket->queue_tail, head); @@ -526,19 +528,19 @@ send_message_notify (void *cls, size_t size, void *buf) GNUNET_free (head); head = socket->queue_head; if (NULL != head) /* more pending messages to send */ - { - socket->retries = 0; - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &target, - ntohs (head->message->header.size), - &send_message_notify, - socket); - } + { + socket->retries = 0; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &target, + ntohs (head->message->header.size), + &send_message_notify, + socket); + } return ret; } @@ -564,10 +566,10 @@ queue_message (struct GNUNET_STREAM_Socket *socket, ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %d and size %d\n", - ntohs (message->header.type), - ntohs (message->header.size)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Queueing message of type %d and size %d\n", + ntohs (message->header.type), + ntohs (message->header.size)); GNUNET_assert (NULL != message); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->message = message; @@ -632,11 +634,11 @@ send_ack_notify (void *cls, size_t size, void *buf) struct GNUNET_STREAM_Socket *socket = cls; if (0 == size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s called with size 0\n", __func__); - return 0; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s called with size 0\n", __func__); + return 0; + } GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); size = ntohs (socket->ack_msg->header.header.size); @@ -673,8 +675,8 @@ retransmission_timeout_task (void *cls, if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Retransmitting DATA...\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Retransmitting DATA...\n"); socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; write_data (socket); } @@ -695,9 +697,9 @@ ack_task (void *cls, struct GNUNET_PeerIdentity target; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) - { - return; - } + { + return; + } socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; @@ -746,22 +748,22 @@ close_msg_retransmission_task (void *cls, msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); switch (shutdown_handle->operation) - { - case SHUT_RDWR: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); - break; - case SHUT_RD: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); - break; - case SHUT_WR: - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); - break; - default: - GNUNET_free (msg); - shutdown_handle->close_msg_retransmission_task_id = - GNUNET_SCHEDULER_NO_TASK; - return; - } + { + case SHUT_RDWR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + break; + case SHUT_RD: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + break; + case SHUT_WR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + break; + default: + GNUNET_free (msg); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + return; + } queue_message (socket, msg, NULL, NULL); shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, @@ -822,46 +824,46 @@ write_data (struct GNUNET_STREAM_Socket *socket) ack_packet = -1; /* Find the last acknowledged packet */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - ack_packet = packet; - else if (NULL == io_handle->messages[packet]) - break; - } + { + if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) + ack_packet = packet; + else if (NULL == io_handle->messages[packet]) + break; + } /* Resend packets which weren't ack'ed */ for (packet=0; packet < ack_packet; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) { - if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Placing DATA message with sequence %u in send queue\n", - ntohl (io_handle->messages[packet]->sequence_number)); - - copy_and_queue_message (socket, - &io_handle->messages[packet]->header, - NULL, - NULL); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Placing DATA message with sequence %u in send queue\n", + ntohl (io_handle->messages[packet]->sequence_number)); + + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); } + } packet = ack_packet + 1; /* Now send new packets if there is enough buffer space */ while ( (NULL != io_handle->messages[packet]) && (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) - { - socket->receiver_window_available -= - ntohs (io_handle->messages[packet]->header.header.size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Placing DATA message with sequence %u in send queue\n", - ntohl (io_handle->messages[packet]->sequence_number)); - copy_and_queue_message (socket, - &io_handle->messages[packet]->header, - NULL, - NULL); - packet++; - } + { + socket->receiver_window_available -= + ntohs (io_handle->messages[packet]->header.header.size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Placing DATA message with sequence %u in send queue\n", + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); + packet++; + } if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) socket->retransmission_timeout_task_id = @@ -901,11 +903,11 @@ call_read_processor (void *cls, /* Check the bitmap for any holes */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, - packet)) - break; - } + { + if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, + packet)) + break; + } /* We only call read processor if we have the first packet */ GNUNET_assert (0 < packet); @@ -919,18 +921,18 @@ call_read_processor (void *cls, socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; /* Call the data processor */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling read processor\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Calling read processor\n"); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, socket->receive_buffer + socket->copy_offset, valid_read_size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Read processor read %d bytes\n", - read_size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Read processor completed successfully\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read processor read %d bytes\n", + read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read processor completed successfully\n"); /* Free the read handle */ GNUNET_free (socket->read_handle); @@ -941,20 +943,20 @@ call_read_processor (void *cls, /* Determine upto which packet we can remove from the buffer */ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (socket->copy_offset == socket->receive_buffer_boundaries[packet]) - { packet++; break; } - if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) - break; - } + { + if (socket->copy_offset == socket->receive_buffer_boundaries[packet]) + { packet++; break; } + if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) + break; + } /* If no packets can be removed we can't move the buffer */ if (0 == packet) return; sequence_increase = packet; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sequence increase after read processor completion: %u\n", - sequence_increase); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sequence increase after read processor completion: %u\n", + sequence_increase); /* Shift the data in the receive buffer */ memmove (socket->receive_buffer, @@ -979,16 +981,16 @@ call_read_processor (void *cls, /* Fix relative boundaries */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) { - if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) - { - socket->receive_buffer_boundaries[packet] = - socket->receive_buffer_boundaries[packet + sequence_increase] - - offset_increase; - } - else - socket->receive_buffer_boundaries[packet] = 0; + socket->receive_buffer_boundaries[packet] = + socket->receive_buffer_boundaries[packet + sequence_increase] + - offset_increase; } + else + socket->receive_buffer_boundaries[packet] = 0; + } } @@ -1000,7 +1002,7 @@ call_read_processor (void *cls, */ static void read_io_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; GNUNET_STREAM_DataProcessor proc; @@ -1009,8 +1011,8 @@ read_io_timeout (void *cls, socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Read task timedout - Cancelling it\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read task timedout - Cancelling it\n"); GNUNET_SCHEDULER_cancel (socket->read_task_id); socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } @@ -1054,138 +1056,138 @@ handle_data (struct GNUNET_STREAM_Socket *socket, size = htons (msg->header.header.size); if (size < sizeof (struct GNUNET_STREAM_DataMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } if (GNUNET_PEER_search (sender) != socket->other_peer) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA from non-confirming peer\n"); + return GNUNET_YES; + } + + switch (socket->state) + { + case STATE_ESTABLISHED: + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + + /* check if the message's sequence number is in the range we are + expecting */ + relative_sequence_number = + ntohl (msg->sequence_number) - socket->read_sequence_number; + if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring received message with sequence number %u\n", + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } + + /* Check if we have already seen this message */ + if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, + relative_sequence_number)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA from non-confirming peer\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring already received message with sequence " + "number %u\n", + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } return GNUNET_YES; } - switch (socket->state) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Receiving DATA with sequence number: %u and size: %d from %x\n", + ntohl (msg->sequence_number), + ntohs (msg->header.header.size), + socket->other_peer); + + /* Check if we have to allocate the buffer */ + size -= sizeof (struct GNUNET_STREAM_DataMessage); + relative_offset = ntohl (msg->offset) - socket->read_offset; + bytes_needed = relative_offset + size; + if (bytes_needed > socket->receive_buffer_size) { - case STATE_ESTABLISHED: - case STATE_TRANSMIT_CLOSED: - case STATE_TRANSMIT_CLOSE_WAIT: - - /* check if the message's sequence number is in the range we are - expecting */ - relative_sequence_number = - ntohl (msg->sequence_number) - socket->read_sequence_number; - if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring received message with sequence number %u\n", - ntohl (msg->sequence_number)); - /* Start ACK sending task if one is not already present */ - if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) - { - socket->ack_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh - (msg->ack_deadline), - &ack_task, - socket); - } - return GNUNET_YES; - } - - /* Check if we have already seen this message */ - if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, - relative_sequence_number)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring already received message with sequence " - "number %u\n", - ntohl (msg->sequence_number)); - /* Start ACK sending task if one is not already present */ - if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) - { - socket->ack_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh - (msg->ack_deadline), - &ack_task, - socket); - } - return GNUNET_YES; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving DATA with sequence number: %u and size: %d from %x\n", - ntohl (msg->sequence_number), - ntohs (msg->header.header.size), - socket->other_peer); - - /* Check if we have to allocate the buffer */ - size -= sizeof (struct GNUNET_STREAM_DataMessage); - relative_offset = ntohl (msg->offset) - socket->read_offset; - bytes_needed = relative_offset + size; - if (bytes_needed > socket->receive_buffer_size) - { - if (bytes_needed <= RECEIVE_BUFFER_SIZE) - { - socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, - bytes_needed); - socket->receive_buffer_size = bytes_needed; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cannot accommodate packet %d as buffer is full\n", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - } + if (bytes_needed <= RECEIVE_BUFFER_SIZE) + { + socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, + bytes_needed); + socket->receive_buffer_size = bytes_needed; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Cannot accommodate packet %d as buffer is full\n", + ntohl (msg->sequence_number)); + return GNUNET_YES; + } + } - /* Copy Data to buffer */ - payload = &msg[1]; - GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); - memcpy (socket->receive_buffer + relative_offset, - payload, - size); - socket->receive_buffer_boundaries[relative_sequence_number] = - relative_offset + size; + /* Copy Data to buffer */ + payload = &msg[1]; + GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); + memcpy (socket->receive_buffer + relative_offset, + payload, + size); + socket->receive_buffer_boundaries[relative_sequence_number] = + relative_offset + size; - /* Modify the ACK bitmap */ - ackbitmap_modify_bit (&socket->ack_bitmap, - relative_sequence_number, - GNUNET_YES); + /* Modify the ACK bitmap */ + ackbitmap_modify_bit (&socket->ack_bitmap, + relative_sequence_number, + GNUNET_YES); - /* Start ACK sending task if one is not already present */ - if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) - { - socket->ack_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh - (msg->ack_deadline), - &ack_task, - socket); - } - - if ((NULL != socket->read_handle) /* A read handle is waiting */ - /* There is no current read task */ - && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) - /* We have the first packet */ - && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, - 0))) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling read processor\n"); - - socket->read_task_id = - GNUNET_SCHEDULER_add_now (&call_read_processor, + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, socket); - } - - break; + } - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received data message when it cannot be handled\n"); - break; + if ((NULL != socket->read_handle) /* A read handle is waiting */ + /* There is no current read task */ + && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) + /* We have the first packet */ + && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, + 0))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling read processor\n"); + + socket->read_task_id = + GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); } + + break; + + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received data message when it cannot be handled\n"); + break; + } return GNUNET_YES; } @@ -1204,11 +1206,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket, */ static int client_handle_data (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - void **tunnel_ctx, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; @@ -1232,28 +1234,28 @@ set_state_established (void *cls, { struct GNUNET_PeerIdentity initiator_pid; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Attaining ESTABLISHED state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Attaining ESTABLISHED state\n"); socket->write_offset = 0; socket->read_offset = 0; socket->state = STATE_ESTABLISHED; /* FIXME: What if listen_cb is NULL */ if (NULL != socket->lsocket) + { + GNUNET_PEER_resolve (socket->other_peer, &initiator_pid); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Calling listen callback\n"); + if (GNUNET_SYSERR == + socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls, + socket, + &initiator_pid)) { - GNUNET_PEER_resolve (socket->other_peer, &initiator_pid); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling listen callback\n"); - if (GNUNET_SYSERR == - socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls, - socket, - &initiator_pid)) - { - socket->state = STATE_CLOSED; - /* FIXME: We should close in a decent way */ - GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */ - GNUNET_free (socket); - } + socket->state = STATE_CLOSED; + /* FIXME: We should close in a decent way */ + GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */ + GNUNET_free (socket); } + } else if (socket->open_cb) socket->open_cb (socket->open_cls, socket); } @@ -1270,8 +1272,8 @@ set_state_hello_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_assert (STATE_INIT == socket->state); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Attaining HELLO_WAIT state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Attaining HELLO_WAIT state\n"); socket->state = STATE_HELLO_WAIT; } @@ -1286,8 +1288,8 @@ static void set_state_close_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Attaing CLOSE_WAIT state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Attaing CLOSE_WAIT state\n"); socket->state = STATE_CLOSE_WAIT; GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ socket->receive_buffer = NULL; @@ -1305,8 +1307,8 @@ static void set_state_receive_close_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Attaing RECEIVE_CLOSE_WAIT state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Attaing RECEIVE_CLOSE_WAIT state\n"); socket->state = STATE_RECEIVE_CLOSE_WAIT; GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ socket->receive_buffer = NULL; @@ -1324,8 +1326,8 @@ static void set_state_transmit_close_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Attaining TRANSMIT_CLOSE_WAIT state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Attaining TRANSMIT_CLOSE_WAIT state\n"); socket->state = STATE_TRANSMIT_CLOSE_WAIT; } @@ -1358,9 +1360,9 @@ generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket) /* Get the random sequence number */ socket->write_sequence_number = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Generated write sequence number %u\n", - (unsigned int) socket->write_sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Generated write sequence number %u\n", + (unsigned int) socket->write_sequence_number); msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); msg->header.header.size = @@ -1398,24 +1400,24 @@ client_handle_hello_ack (void *cls, struct GNUNET_STREAM_HelloAckMessage *reply; if (GNUNET_PEER_search (sender) != socket->other_peer) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received HELLO_ACK from non-confirming peer\n"); - return GNUNET_YES; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received HELLO_ACK from non-confirming peer\n"); + return GNUNET_YES; + } ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received HELLO_ACK from %x\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received HELLO_ACK from %x\n", + socket->other_peer); GNUNET_assert (socket->tunnel == tunnel); switch (socket->state) { case STATE_HELLO_WAIT: socket->read_sequence_number = ntohl (ack_msg->sequence_number); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Read sequence number %u\n", - (unsigned int) socket->read_sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read sequence number %u\n", + (unsigned int) socket->read_sequence_number); socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); reply = generate_hello_ack_msg (socket); queue_message (socket, @@ -1429,10 +1431,10 @@ client_handle_hello_ack (void *cls, return GNUNET_OK; case STATE_INIT: default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server %x sent HELLO_ACK when in state %d\n", - socket->other_peer, - socket->state); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server %x sent HELLO_ACK when in state %d\n", + socket->other_peer, + socket->state); socket->state = STATE_CLOSED; // introduce STATE_ERROR? return GNUNET_SYSERR; } @@ -1487,22 +1489,22 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_MessageHeader *reply; switch (socket->state) - { - case STATE_ESTABLISHED: - socket->state = STATE_RECEIVE_CLOSED; + { + case STATE_ESTABLISHED: + socket->state = STATE_RECEIVE_CLOSED; - /* Send TRANSMIT_CLOSE_ACK */ - reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); - reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - queue_message (socket, reply, NULL, NULL); - break; + /* Send TRANSMIT_CLOSE_ACK */ + reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); + reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + queue_message (socket, reply, NULL, NULL); + break; - default: - /* FIXME: Call statistics? */ - break; - } + default: + /* FIXME: Call statistics? */ + break; + } return GNUNET_YES; } @@ -1561,86 +1563,86 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, shutdown_handle = socket->shutdown_handle; if (NULL == shutdown_handle) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received *CLOSE_ACK when shutdown handle is NULL\n"); + return GNUNET_OK; + } + + switch (operation) + { + case SHUT_RDWR: + switch (socket->state) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received *CLOSE_ACK when shutdown handle is NULL\n"); + case STATE_CLOSE_WAIT: + if (SHUT_RDWR != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n"); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE_ACK from %x\n", + socket->other_peer); + socket->state = STATE_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE_ACK when in it not expected\n"); return GNUNET_OK; } + break; - switch (operation) + case SHUT_RD: + switch (socket->state) { - case SHUT_RDWR: - switch (socket->state) - { - case STATE_CLOSE_WAIT: - if (SHUT_RDWR != shutdown_handle->operation) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n"); - return GNUNET_OK; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CLOSE_ACK from %x\n", - socket->other_peer); - socket->state = STATE_CLOSED; - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CLOSE_ACK when in it not expected\n"); - return GNUNET_OK; - } + case STATE_RECEIVE_CLOSE_WAIT: + if (SHUT_RD != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n"); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE_ACK from %x\n", + socket->other_peer); + socket->state = STATE_RECEIVE_CLOSED; break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE_ACK when in it not expected\n"); + return GNUNET_OK; + } - case SHUT_RD: - switch (socket->state) - { - case STATE_RECEIVE_CLOSE_WAIT: - if (SHUT_RD != shutdown_handle->operation) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n"); - return GNUNET_OK; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received RECEIVE_CLOSE_ACK from %x\n", - socket->other_peer); - socket->state = STATE_RECEIVE_CLOSED; - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received RECEIVE_CLOSE_ACK when in it not expected\n"); - return GNUNET_OK; - } + break; + case SHUT_WR: + switch (socket->state) + { + case STATE_TRANSMIT_CLOSE_WAIT: + if (SHUT_WR != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n"); + return GNUNET_OK; + } - break; - case SHUT_WR: - switch (socket->state) - { - case STATE_TRANSMIT_CLOSE_WAIT: - if (SHUT_WR != shutdown_handle->operation) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n"); - return GNUNET_OK; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received TRANSMIT_CLOSE_ACK from %x\n", - socket->other_peer); - socket->state = STATE_TRANSMIT_CLOSED; - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received TRANSMIT_CLOSE_ACK when in it not expected\n"); - - return GNUNET_OK; - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received TRANSMIT_CLOSE_ACK from %x\n", + socket->other_peer); + socket->state = STATE_TRANSMIT_CLOSED; break; default: - GNUNET_assert (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received TRANSMIT_CLOSE_ACK when in it not expected\n"); + + return GNUNET_OK; } + break; + default: + GNUNET_assert (0); + } if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ shutdown_handle->completion_cb(shutdown_handle->completion_cls, @@ -1649,12 +1651,12 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, socket->shutdown_handle = NULL; if (GNUNET_SCHEDULER_NO_TASK != shutdown_handle->close_msg_retransmission_task_id) - { - GNUNET_SCHEDULER_cancel - (shutdown_handle->close_msg_retransmission_task_id); - shutdown_handle->close_msg_retransmission_task_id = - GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel + (shutdown_handle->close_msg_retransmission_task_id); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + } return GNUNET_OK; } @@ -1712,20 +1714,20 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_MessageHeader *receive_close_ack; switch (socket->state) - { - case STATE_INIT: - case STATE_LISTEN: - case STATE_HELLO_WAIT: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); - return GNUNET_OK; - default: - break; - } + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); + return GNUNET_OK; + default: + break; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received RECEIVE_CLOSE from %x\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE from %x\n", + socket->other_peer); receive_close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); receive_close_ack->header.size = @@ -1738,8 +1740,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, NULL); /* FIXME: Handle the case where write handle is present; the write operation - should be deemed as finised and the write continuation callback - has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ + should be deemed as finised and the write continuation callback + has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ return GNUNET_OK; } @@ -1828,20 +1830,20 @@ handle_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_MessageHeader *close_ack; switch (socket->state) - { - case STATE_INIT: - case STATE_LISTEN: - case STATE_HELLO_WAIT: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); - return GNUNET_OK; - default: - break; - } + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); + return GNUNET_OK; + default: + break; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CLOSE from %x\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE from %x\n", + socket->other_peer); close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); @@ -1978,34 +1980,34 @@ server_handle_hello (void *cls, struct GNUNET_STREAM_HelloAckMessage *reply; if (GNUNET_PEER_search (sender) != socket->other_peer) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received HELLO from non-confirming peer\n"); - return GNUNET_YES; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received HELLO from non-confirming peer\n"); + return GNUNET_YES; + } GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received HELLO from %x\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received HELLO from %x\n", + socket->other_peer); if (STATE_INIT == socket->state) - { - reply = generate_hello_ack_msg (socket); - queue_message (socket, - &reply->header, - &set_state_hello_wait, - NULL); - } + { + reply = generate_hello_ack_msg (socket); + queue_message (socket, + &reply->header, + &set_state_hello_wait, + NULL); + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client sent HELLO when in state %d\n", socket->state); - /* FIXME: Send RESET? */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client sent HELLO when in state %d\n", socket->state); + /* FIXME: Send RESET? */ - } + } return GNUNET_OK; } @@ -2038,26 +2040,26 @@ server_handle_hello_ack (void *cls, GNUNET_assert (socket->tunnel == tunnel); ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; if (STATE_HELLO_WAIT == socket->state) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received HELLO_ACK from %x\n", - socket->other_peer); - socket->read_sequence_number = ntohl (ack_message->sequence_number); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Read sequence number %u\n", - (unsigned int) socket->read_sequence_number); - socket->receiver_window_available = - ntohl (ack_message->receiver_window_size); - /* Attain ESTABLISHED state */ - set_state_established (NULL, socket); - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received HELLO_ACK from %x\n", + socket->other_peer); + socket->read_sequence_number = ntohl (ack_message->sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Read sequence number %u\n", + (unsigned int) socket->read_sequence_number); + socket->receiver_window_available = + ntohl (ack_message->receiver_window_size); + /* Attain ESTABLISHED state */ + set_state_established (NULL, socket); + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client sent HELLO_ACK when in state %d\n", socket->state); - /* FIXME: Send RESET? */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client sent HELLO_ACK when in state %d\n", socket->state); + /* FIXME: Send RESET? */ - } + } return GNUNET_OK; } @@ -2299,113 +2301,113 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, if (GNUNET_PEER_search (sender) != socket->other_peer) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received ACK from non-confirming peer\n"); - return GNUNET_YES; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received ACK from non-confirming peer\n"); + return GNUNET_YES; + } switch (socket->state) + { + case (STATE_ESTABLISHED): + case (STATE_RECEIVE_CLOSED): + case (STATE_RECEIVE_CLOSE_WAIT): + if (NULL == socket->write_handle) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA_ACK when write_handle is NULL\n"); + return GNUNET_OK; + } + /* FIXME: increment in the base sequence number is breaking current flow + */ + if (!((socket->write_sequence_number + - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) { - case (STATE_ESTABLISHED): - case (STATE_RECEIVE_CLOSED): - case (STATE_RECEIVE_CLOSE_WAIT): - if (NULL == socket->write_handle) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA_ACK when write_handle is NULL\n"); - return GNUNET_OK; - } - /* FIXME: increment in the base sequence number is breaking current flow - */ - if (!((socket->write_sequence_number - - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA_ACK with unexpected base sequence number\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Current write sequence: %u; Ack's base sequence: %u\n", - socket->write_sequence_number, - ntohl (ack->base_sequence_number)); - return GNUNET_OK; - } - /* FIXME: include the case when write_handle is cancelled - ignore the - acks */ - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA_ACK from %x\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA_ACK with unexpected base sequence number\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Current write sequence: %u; Ack's base sequence: %u\n", + socket->write_sequence_number, + ntohl (ack->base_sequence_number)); + return GNUNET_OK; + } + /* FIXME: include the case when write_handle is cancelled - ignore the + acks */ + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received DATA_ACK from %x\n", + socket->other_peer); - /* Cancel the retransmission task */ - if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) - { - GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); - socket->retransmission_timeout_task_id = - GNUNET_SCHEDULER_NO_TASK; - } + /* Cancel the retransmission task */ + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_NO_TASK; + } - for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (NULL == socket->write_handle->messages[packet]) break; - if (ntohl (ack->base_sequence_number) - >= ntohl (socket->write_handle->messages[packet]->sequence_number)) - ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, - packet, - GNUNET_YES); - else - if (GNUNET_YES == - ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap, - ntohl (socket->write_handle->messages[packet]->sequence_number) - - ntohl (ack->base_sequence_number))) - ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, - packet, - GNUNET_YES); - } - - /* Update the receive window remaining + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (ntohl (ack->base_sequence_number) + >= ntohl (socket->write_handle->messages[packet]->sequence_number)) + ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, + packet, + GNUNET_YES); + else + if (GNUNET_YES == + ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap, + ntohl (socket->write_handle->messages[packet]->sequence_number) + - ntohl (ack->base_sequence_number))) + ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, + packet, + GNUNET_YES); + } + + /* Update the receive window remaining FIXME : Should update with the value from a data ack with greater sequence number */ - socket->receiver_window_available = - ntohl (ack->receive_window_remaining); + socket->receiver_window_available = + ntohl (ack->receive_window_remaining); - /* Check if we have received all acknowledgements */ - need_retransmission = GNUNET_NO; + /* Check if we have received all acknowledgements */ + need_retransmission = GNUNET_NO; + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (GNUNET_YES != ackbitmap_is_bit_set + (&socket->write_handle->ack_bitmap,packet)) + { + need_retransmission = GNUNET_YES; + break; + } + } + if (GNUNET_YES == need_retransmission) + { + write_data (socket); + } + else /* We have to call the write continuation callback now */ + { + /* Free the packets */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (NULL == socket->write_handle->messages[packet]) break; - if (GNUNET_YES != ackbitmap_is_bit_set - (&socket->write_handle->ack_bitmap,packet)) - { - need_retransmission = GNUNET_YES; - break; - } - } - if (GNUNET_YES == need_retransmission) - { - write_data (socket); - } - else /* We have to call the write continuation callback now */ - { - /* Free the packets */ - for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - GNUNET_free_non_null (socket->write_handle->messages[packet]); - } - if (NULL != socket->write_handle->write_cont) - socket->write_handle->write_cont - (socket->write_handle->write_cont_cls, - socket->status, - socket->write_handle->size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write completion callback completed\n"); - /* We are done with the write handle - Freeing it */ - GNUNET_free (socket->write_handle); - socket->write_handle = NULL; - } - break; - default: - break; + { + GNUNET_free_non_null (socket->write_handle->messages[packet]); + } + if (NULL != socket->write_handle->write_cont) + socket->write_handle->write_cont + (socket->write_handle->write_cont_cls, + socket->status, + socket->write_handle->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Write completion callback completed\n"); + /* We are done with the write handle - Freeing it */ + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; } + break; + default: + break; + } return GNUNET_OK; } @@ -2541,15 +2543,15 @@ mesh_peer_connect_callback (void *cls, connected_peer = GNUNET_PEER_search (peer); if (connected_peer != socket->other_peer) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "A peer which is not our target has connected to our tunnel\n"); - return; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "A peer which is not our target has connected to our tunnel\n"); + return; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Target peer %x connected\n", - connected_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Target peer %x connected\n", + connected_peer); /* Set state to INIT */ socket->state = STATE_INIT; @@ -2565,10 +2567,10 @@ mesh_peer_connect_callback (void *cls, /* Call open callback */ if (NULL == socket->open_cb) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "STREAM_open callback is NULL\n"); - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "STREAM_open callback is NULL\n"); + } } @@ -2585,9 +2587,9 @@ mesh_peer_disconnect_callback (void *cls, struct GNUNET_STREAM_Socket *socket=cls; /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Other peer %x disconnected\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Other peer %x disconnected\n", + socket->other_peer); } @@ -2619,9 +2621,9 @@ new_tunnel_notify (void *cls, socket->session_id = 0; /* FIXME */ socket->state = STATE_INIT; socket->lsocket = lsocket; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer %x initiated tunnel to us\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Peer %x initiated tunnel to us\n", + socket->other_peer); /* FIXME: Copy MESH handle from lsocket to socket */ @@ -2652,36 +2654,36 @@ tunnel_cleaner (void *cls, return; GNUNET_break_op(0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer %x has terminated connection abruptly\n", - socket->other_peer); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Peer %x has terminated connection abruptly\n", + socket->other_peer); socket->status = GNUNET_STREAM_SHUTDOWN; /* Clear Transmit handles */ if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } if (NULL != socket->ack_transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); - GNUNET_free (socket->ack_msg); - socket->ack_msg = NULL; - socket->ack_transmit_handle = NULL; - } + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } /* Stop Tasks using socket->tunnel */ if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) - { - GNUNET_SCHEDULER_cancel (socket->ack_task_id); - socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; + } if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) - { - GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); - socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } /* FIXME: Cancel all other tasks using socket->tunnel */ socket->tunnel = NULL; } @@ -2718,8 +2720,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; va_list vargs; /* Variable arguments */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s\n", __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s\n", __func__); socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); socket->other_peer = GNUNET_PEER_intern (target); @@ -2733,15 +2735,15 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, do { option = va_arg (vargs, enum GNUNET_STREAM_Option); switch (option) - { - case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: - /* Expect struct GNUNET_TIME_Relative */ - socket->retransmit_timeout = va_arg (vargs, - struct GNUNET_TIME_Relative); - break; - case GNUNET_STREAM_OPTION_END: - break; - } + { + case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: + /* Expect struct GNUNET_TIME_Relative */ + socket->retransmit_timeout = va_arg (vargs, + struct GNUNET_TIME_Relative); + break; + case GNUNET_STREAM_OPTION_END: + break; + } } while (GNUNET_STREAM_OPTION_END != option); va_end (vargs); /* End of variable args parsing */ socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ @@ -2752,14 +2754,14 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, client_message_handlers, ports); /* We don't get inbound tunnels */ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ - { - GNUNET_free (socket); - return NULL; - } + { + GNUNET_free (socket); + return NULL; + } /* Now create the mesh tunnel to target */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating MESH Tunnel\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating MESH Tunnel\n"); socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, NULL, /* Tunnel context */ &mesh_peer_connect_callback, @@ -2769,8 +2771,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_MESH_peer_request_connect_add (socket->tunnel, target); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); return socket; } @@ -2805,55 +2807,55 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); switch (operation) - { - case SHUT_RD: - handle->operation = SHUT_RD; - if (NULL != socket->read_handle) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Existing read handle should be cancelled before shutting" - " down reading\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); - queue_message (socket, - msg, - &set_state_receive_close_wait, - NULL); - break; - case SHUT_WR: - handle->operation = SHUT_WR; - if (NULL != socket->write_handle) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Existing write handle should be cancelled before shutting" - " down writing\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); - queue_message (socket, - msg, - &set_state_transmit_close_wait, - NULL); - break; - case SHUT_RDWR: - handle->operation = SHUT_RDWR; - if (NULL != socket->write_handle) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Existing write handle should be cancelled before shutting" - " down writing\n"); - if (NULL != socket->read_handle) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Existing read handle should be cancelled before shutting" - " down reading\n"); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); - queue_message (socket, - msg, - &set_state_close_wait, - NULL); - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "GNUNET_STREAM_shutdown called with invalid value for " - "parameter operation -- Ignoring\n"); - GNUNET_free (msg); - GNUNET_free (handle); - return NULL; - } + { + case SHUT_RD: + handle->operation = SHUT_RD; + if (NULL != socket->read_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + queue_message (socket, + msg, + &set_state_receive_close_wait, + NULL); + break; + case SHUT_WR: + handle->operation = SHUT_WR; + if (NULL != socket->write_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + queue_message (socket, + msg, + &set_state_transmit_close_wait, + NULL); + break; + case SHUT_RDWR: + handle->operation = SHUT_RDWR; + if (NULL != socket->write_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + if (NULL != socket->read_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + queue_message (socket, + msg, + &set_state_close_wait, + NULL); + break; + default: + LOG (GNUNET_ERROR_TYPE_WARNING, + "GNUNET_STREAM_shutdown called with invalid value for " + "parameter operation -- Ignoring\n"); + GNUNET_free (msg); + GNUNET_free (handle); + return NULL; + } handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, &close_msg_retransmission_task, @@ -2891,33 +2893,33 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) GNUNET_break (NULL == socket->write_handle); if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) - { - /* socket closed with read task pending!? */ - GNUNET_break (0); - GNUNET_SCHEDULER_cancel (socket->read_task_id); - socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; - } + { + /* socket closed with read task pending!? */ + GNUNET_break (0); + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; + } /* Terminate the ack'ing tasks if they are still present */ if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (socket->ack_task_id); - socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; + } /* Clear Transmit handles */ if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } if (NULL != socket->ack_transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); - GNUNET_free (socket->ack_msg); - socket->ack_msg = NULL; - socket->ack_transmit_handle = NULL; - } + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } /* Clear existing message queue */ while (NULL != (head = socket->queue_head)) { @@ -2930,23 +2932,23 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) /* Close associated tunnel */ if (NULL != socket->tunnel) - { - GNUNET_MESH_tunnel_destroy (socket->tunnel); - socket->tunnel = NULL; - } + { + GNUNET_MESH_tunnel_destroy (socket->tunnel); + socket->tunnel = NULL; + } /* Close mesh connection */ if (NULL != socket->mesh && NULL == socket->lsocket) - { - GNUNET_MESH_disconnect (socket->mesh); - socket->mesh = NULL; - } + { + GNUNET_MESH_disconnect (socket->mesh); + socket->mesh = NULL; + } /* Release receive buffer */ if (NULL != socket->receive_buffer) - { - GNUNET_free (socket->receive_buffer); - } + { + GNUNET_free (socket->receive_buffer); + } GNUNET_free (socket); } @@ -3039,8 +3041,8 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, const void *sweep; struct GNUNET_TIME_Relative ack_deadline; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s\n", __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s\n", __func__); /* Return NULL if there is already a write request pending */ if (NULL != socket->write_handle) @@ -3050,30 +3052,30 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, } switch (socket->state) - { - case STATE_TRANSMIT_CLOSED: - case STATE_TRANSMIT_CLOSE_WAIT: - case STATE_CLOSED: - case STATE_CLOSE_WAIT: - if (NULL != write_cont) - write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", __func__); - return NULL; - case STATE_INIT: - case STATE_LISTEN: - case STATE_HELLO_WAIT: - if (NULL != write_cont) - /* FIXME: GNUNET_STREAM_SYSERR?? */ - write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", __func__); - return NULL; - case STATE_ESTABLISHED: - case STATE_RECEIVE_CLOSED: - case STATE_RECEIVE_CLOSE_WAIT: - break; - } + { + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + if (NULL != write_cont) + write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + if (NULL != write_cont) + /* FIXME: GNUNET_STREAM_SYSERR?? */ + write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_ESTABLISHED: + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + break; + } if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; @@ -3089,43 +3091,43 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); /* Divide the given buffer into packets for sending */ for (packet=0; packet < num_needed_packets; packet++) + { + if ((packet + 1) * max_payload_size < size) { - if ((packet + 1) * max_payload_size < size) - { - payload_size = max_payload_size; - packet_size = MAX_PACKET_SIZE; - } - else - { - payload_size = size - packet * max_payload_size; - packet_size = payload_size + sizeof (struct - GNUNET_STREAM_DataMessage); - } - io_handle->messages[packet] = GNUNET_malloc (packet_size); - io_handle->messages[packet]->header.header.size = htons (packet_size); - io_handle->messages[packet]->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); - io_handle->messages[packet]->sequence_number = - htonl (socket->write_sequence_number++); - io_handle->messages[packet]->offset = htonl (socket->write_offset); - - /* FIXME: Remove the fixed delay for ack deadline; Set it to the value - determined from RTT */ - io_handle->messages[packet]->ack_deadline = - GNUNET_TIME_relative_hton (ack_deadline); - data_msg = io_handle->messages[packet]; - /* Copy data from given buffer to the packet */ - memcpy (&data_msg[1], - sweep, - payload_size); - sweep += payload_size; - socket->write_offset += payload_size; + payload_size = max_payload_size; + packet_size = MAX_PACKET_SIZE; } + else + { + payload_size = size - packet * max_payload_size; + packet_size = payload_size + sizeof (struct + GNUNET_STREAM_DataMessage); + } + io_handle->messages[packet] = GNUNET_malloc (packet_size); + io_handle->messages[packet]->header.header.size = htons (packet_size); + io_handle->messages[packet]->header.header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); + io_handle->messages[packet]->sequence_number = + htonl (socket->write_sequence_number++); + io_handle->messages[packet]->offset = htonl (socket->write_offset); + + /* FIXME: Remove the fixed delay for ack deadline; Set it to the value + determined from RTT */ + io_handle->messages[packet]->ack_deadline = + GNUNET_TIME_relative_hton (ack_deadline); + data_msg = io_handle->messages[packet]; + /* Copy data from given buffer to the packet */ + memcpy (&data_msg[1], + sweep, + payload_size); + sweep += payload_size; + socket->write_offset += payload_size; + } socket->write_handle = io_handle; write_data (socket); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); return io_handle; } @@ -3152,30 +3154,30 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, { struct GNUNET_STREAM_IOReadHandle *read_handle; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s()\n", - __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s()\n", + __func__); /* Return NULL if there is already a read handle; the user has to cancel that - first before continuing or has to wait until it is completed */ + first before continuing or has to wait until it is completed */ if (NULL != socket->read_handle) return NULL; GNUNET_assert (NULL != proc); switch (socket->state) - { - case STATE_RECEIVE_CLOSED: - case STATE_RECEIVE_CLOSE_WAIT: - case STATE_CLOSED: - case STATE_CLOSE_WAIT: - proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", - __func__); - return NULL; - default: - break; - } + { + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", + __func__); + return NULL; + default: + break; + } read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); read_handle->proc = proc; @@ -3185,20 +3187,20 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, /* Check if we have a packet at bitmap 0 */ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 0)) - { - socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, - socket); + { + socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); - } + } /* Setup the read timeout task */ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s() END\n", - __func__); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", + __func__); return read_handle; } @@ -3218,16 +3220,16 @@ GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) GNUNET_assert (socket->write_handle == ioh); if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) - { - GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); - socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; - } + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (NULL == ioh->messages[packet]) break; - GNUNET_free (ioh->messages[packet]); - } + { + if (NULL == ioh->messages[packet]) break; + GNUNET_free (ioh->messages[packet]); + } GNUNET_free (socket->write_handle); socket->write_handle = NULL; -- 2.25.1