X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fstream%2Fstream_api.c;h=3bf3c7863df49b348ece91bf5280d20e51e9f115;hb=7f96c0370dfa90c1c6c64ae63e69c8ec3ae8b47f;hp=535850de2a02cf1d49462cca150eb6af03d1c412;hpb=06dc842c43cd3e35aa166bb025a7093bbc0cac7e;p=oweals%2Fgnunet.git diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 535850de2..3bf3c7863 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -211,6 +211,16 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_MESH_TransmitHandle *transmit_handle; + /** + * The current act transmit handle (if a pending ack transmit request exists) + */ + struct GNUNET_MESH_TransmitHandle *ack_transmit_handle; + + /** + * Pointer to the current ack message using in ack_task + */ + struct GNUNET_STREAM_AckMessage *ack_msg; + /** * The current message associated with the transmit handle */ @@ -287,11 +297,6 @@ struct GNUNET_STREAM_Socket */ GNUNET_PEER_Id other_peer; - /** - * Our Peer Identity (for debugging) - */ - GNUNET_PEER_Id our_id; - /** * The application port number (type: uint32_t) */ @@ -366,11 +371,6 @@ struct GNUNET_STREAM_ListenSocket */ void *listen_cb_cls; - /** - * Our interned Peer's identity - */ - GNUNET_PEER_Id our_id; - /** * The service port * FIXME: Remove if not required! @@ -444,11 +444,6 @@ struct GNUNET_STREAM_ShutdownHandle */ struct GNUNET_STREAM_Socket *socket; - /** - * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR - */ - int operation; - /** * Shutdown completion callback */ @@ -458,6 +453,16 @@ struct GNUNET_STREAM_ShutdownHandle * Closure for completion callback */ void *completion_cls; + + /** + * Close message retransmission task id + */ + GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id; + + /** + * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR + */ + int operation; }; @@ -560,8 +565,7 @@ queue_message (struct GNUNET_STREAM_Socket *socket, && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Queueing message of type %d and size %d\n", - socket->our_id, + "Queueing message of type %d and size %d\n", ntohs (message->header.type), ntohs (message->header.size)); GNUNET_assert (NULL != message); @@ -625,7 +629,7 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, static size_t send_ack_notify (void *cls, size_t size, void *buf) { - struct GNUNET_STREAM_AckMessage *ack_msg = cls; + struct GNUNET_STREAM_Socket *socket = cls; if (0 == size) { @@ -633,10 +637,14 @@ send_ack_notify (void *cls, size_t size, void *buf) "%s called with size 0\n", __func__); return 0; } - GNUNET_assert (ntohs (ack_msg->header.header.size) <= size); + GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); - size = ntohs (ack_msg->header.header.size); - memcpy (buf, ack_msg, size); + size = ntohs (socket->ack_msg->header.header.size); + memcpy (buf, socket->ack_msg, size); + + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; return size; } @@ -666,7 +674,7 @@ retransmission_timeout_task (void *cls, return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Retransmitting DATA...\n", socket->our_id); + "Retransmitting DATA...\n"); socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; write_data (socket); } @@ -691,7 +699,7 @@ ack_task (void *cls, return; } - socket->ack_task_id = 0; + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; /* Create the ACK Message */ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); @@ -703,18 +711,62 @@ ack_task (void *cls, ack_msg->receive_window_remaining = htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); + socket->ack_msg = ack_msg; GNUNET_PEER_resolve (socket->other_peer, &target); /* Request MESH for sending ACK */ - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - socket->retransmit_timeout, - &target, - ntohs (ack_msg->header.header.size), - &send_ack_notify, - ack_msg); + socket->ack_transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + socket->retransmit_timeout, + &target, + ntohs (ack_msg->header.header.size), + &send_ack_notify, + socket); +} - + +/** + * Retransmission task for shutdown messages + * + * @param cls the shutdown handle + * @param tc the Task Context + */ +static void +close_msg_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls; + struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_STREAM_Socket *socket; + + GNUNET_assert (NULL != shutdown_handle); + socket = shutdown_handle->socket; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (shutdown_handle->operation) + { + case SHUT_RDWR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + break; + case SHUT_RD: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + break; + case SHUT_WR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + break; + default: + GNUNET_free (msg); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + return; + } + queue_message (socket, msg, NULL, NULL); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + shutdown_handle); } @@ -784,8 +836,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) packet)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Placing DATA message with sequence %u in send queue\n", - socket->our_id, + "Placing DATA message with sequence %u in send queue\n", ntohl (io_handle->messages[packet]->sequence_number)); copy_and_queue_message (socket, @@ -803,8 +854,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) socket->receiver_window_available -= ntohs (io_handle->messages[packet]->header.header.size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Placing DATA message with sequence %u in send queue\n", - socket->our_id, + "Placing DATA message with sequence %u in send queue\n", ntohl (io_handle->messages[packet]->sequence_number)); copy_and_queue_message (socket, &io_handle->messages[packet]->header, @@ -870,20 +920,17 @@ call_read_processor (void *cls, /* Call the data processor */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Calling read processor\n", - socket->our_id); + "Calling read processor\n"); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, socket->receive_buffer + socket->copy_offset, valid_read_size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Read processor read %d bytes\n", - socket->our_id, + "Read processor read %d bytes\n", read_size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Read processor completed successfully\n", - socket->our_id); + "Read processor completed successfully\n"); /* Free the read handle */ GNUNET_free (socket->read_handle); @@ -906,8 +953,7 @@ call_read_processor (void *cls, sequence_increase = packet; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Sequence increase after read processor completion: %u\n", - socket->our_id, + "Sequence increase after read processor completion: %u\n", sequence_increase); /* Shift the data in the receive buffer */ @@ -964,8 +1010,7 @@ read_io_timeout (void *cls, if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Read task timedout - Cancelling it\n", - socket->our_id); + "Read task timedout - Cancelling it\n"); GNUNET_SCHEDULER_cancel (socket->read_task_id); socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } @@ -1017,8 +1062,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, if (GNUNET_PEER_search (sender) != socket->other_peer) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received DATA from non-confirming peer\n", - socket->our_id); + "Received DATA from non-confirming peer\n"); return GNUNET_YES; } @@ -1035,8 +1079,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Ignoring received message with sequence number %u\n", - socket->our_id, + "Ignoring received message with sequence number %u\n", ntohl (msg->sequence_number)); /* Start ACK sending task if one is not already present */ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) @@ -1055,9 +1098,8 @@ handle_data (struct GNUNET_STREAM_Socket *socket, relative_sequence_number)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Ignoring already received message with sequence " + "Ignoring already received message with sequence " "number %u\n", - socket->our_id, ntohl (msg->sequence_number)); /* Start ACK sending task if one is not already present */ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) @@ -1072,9 +1114,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Receiving DATA with sequence number: %u and size: %d " - "from %x\n", - socket->our_id, + "Receiving DATA with sequence number: %u and size: %d from %x\n", ntohl (msg->sequence_number), ntohs (msg->header.header.size), socket->other_peer); @@ -1094,9 +1134,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Cannot accommodate packet %d as buffer is", - "full\n", - socket->our_id, + "Cannot accommodate packet %d as buffer is full\n", ntohl (msg->sequence_number)); return GNUNET_YES; } @@ -1134,8 +1172,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, 0))) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Scheduling read processor\n", - socket->our_id); + "Scheduling read processor\n"); socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, @@ -1146,8 +1183,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket, default: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received data message when it cannot be handled\n", - socket->our_id); + "Received data message when it cannot be handled\n"); break; } return GNUNET_YES; @@ -1197,8 +1233,7 @@ set_state_established (void *cls, struct GNUNET_PeerIdentity initiator_pid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Attaining ESTABLISHED state\n", - socket->our_id); + "Attaining ESTABLISHED state\n"); socket->write_offset = 0; socket->read_offset = 0; socket->state = STATE_ESTABLISHED; @@ -1207,8 +1242,7 @@ set_state_established (void *cls, { GNUNET_PEER_resolve (socket->other_peer, &initiator_pid); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Calling listen callback\n", - socket->our_id); + "Calling listen callback\n"); if (GNUNET_SYSERR == socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls, socket, @@ -1237,8 +1271,7 @@ set_state_hello_wait (void *cls, { GNUNET_assert (STATE_INIT == socket->state); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Attaining HELLO_WAIT state\n", - socket->our_id); + "Attaining HELLO_WAIT state\n"); socket->state = STATE_HELLO_WAIT; } @@ -1254,8 +1287,7 @@ set_state_close_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Attaing CLOSE_WAIT state\n", - socket->our_id); + "Attaing CLOSE_WAIT state\n"); socket->state = STATE_CLOSE_WAIT; GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ socket->receive_buffer = NULL; @@ -1263,6 +1295,41 @@ set_state_close_wait (void *cls, } +/** + * Callback to set state to RECEIVE_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_receive_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Attaing RECEIVE_CLOSE_WAIT state\n"); + socket->state = STATE_RECEIVE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to TRANSMIT_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_transmit_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Attaining TRANSMIT_CLOSE_WAIT state\n"); + socket->state = STATE_TRANSMIT_CLOSE_WAIT; +} + + /** * Callback to set state to CLOSED * @@ -1292,8 +1359,7 @@ generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket) socket->write_sequence_number = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Generated write sequence number %u\n", - socket->our_id, + "Generated write sequence number %u\n", (unsigned int) socket->write_sequence_number); msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); @@ -1334,14 +1400,12 @@ client_handle_hello_ack (void *cls, if (GNUNET_PEER_search (sender) != socket->other_peer) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received HELLO_ACK from non-confirming peer\n", - socket->our_id); + "Received HELLO_ACK from non-confirming peer\n"); return GNUNET_YES; } ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received HELLO_ACK from %x\n", - socket->our_id, + "Received HELLO_ACK from %x\n", socket->other_peer); GNUNET_assert (socket->tunnel == tunnel); @@ -1350,8 +1414,7 @@ client_handle_hello_ack (void *cls, case STATE_HELLO_WAIT: socket->read_sequence_number = ntohl (ack_msg->sequence_number); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Read sequence number %u\n", - socket->our_id, + "Read sequence number %u\n", (unsigned int) socket->read_sequence_number); socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); reply = generate_hello_ack_msg (socket); @@ -1367,8 +1430,7 @@ client_handle_hello_ack (void *cls, case STATE_INIT: default: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Server %x sent HELLO_ACK when in state %d\n", - socket->our_id, + "Server %x sent HELLO_ACK when in state %d\n", socket->other_peer, socket->state); socket->state = STATE_CLOSED; // introduce STATE_ERROR? @@ -1398,7 +1460,7 @@ client_handle_reset (void *cls, const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_Socket *socket = cls; + // struct GNUNET_STREAM_Socket *socket = cls; return GNUNET_OK; } @@ -1475,6 +1537,128 @@ client_handle_transmit_close (void *cls, } +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @param operation the close operation which is being ACK'ed + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + int operation) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + shutdown_handle = socket->shutdown_handle; + if (NULL == shutdown_handle) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received *CLOSE_ACK when shutdown handle is NULL\n"); + return GNUNET_OK; + } + + switch (operation) + { + case SHUT_RDWR: + switch (socket->state) + { + case STATE_CLOSE_WAIT: + if (SHUT_RDWR != shutdown_handle->operation) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n"); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE_ACK from %x\n", + socket->other_peer); + socket->state = STATE_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received CLOSE_ACK when in it not expected\n"); + return GNUNET_OK; + } + break; + + case SHUT_RD: + switch (socket->state) + { + case STATE_RECEIVE_CLOSE_WAIT: + if (SHUT_RD != shutdown_handle->operation) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n"); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE_ACK from %x\n", + socket->other_peer); + socket->state = STATE_RECEIVE_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE_ACK when in it not expected\n"); + return GNUNET_OK; + } + + break; + case SHUT_WR: + switch (socket->state) + { + case STATE_TRANSMIT_CLOSE_WAIT: + if (SHUT_WR != shutdown_handle->operation) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n"); + return GNUNET_OK; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received TRANSMIT_CLOSE_ACK from %x\n", + socket->other_peer); + socket->state = STATE_TRANSMIT_CLOSED; + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received TRANSMIT_CLOSE_ACK when in it not expected\n"); + + return GNUNET_OK; + } + break; + default: + GNUNET_assert (0); + } + + if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ + shutdown_handle->completion_cb(shutdown_handle->completion_cls, + operation); + GNUNET_free (shutdown_handle); /* Free shutdown handle */ + socket->shutdown_handle = NULL; + if (GNUNET_SCHEDULER_NO_TASK + != shutdown_handle->close_msg_retransmission_task_id) + { + GNUNET_SCHEDULER_cancel + (shutdown_handle->close_msg_retransmission_task_id); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + return GNUNET_OK; +} + + /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK * @@ -1497,6 +1681,65 @@ client_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_receive_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_MessageHeader *receive_close_ack; + + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); + return GNUNET_OK; + default: + break; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received RECEIVE_CLOSE from %x\n", + socket->other_peer); + receive_close_ack = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.size = + htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); + queue_message (socket, + receive_close_ack, + &set_state_closed, + NULL); + + /* FIXME: Handle the case where write handle is present; the write operation + should be deemed as finised and the write continuation callback + has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ return GNUNET_OK; } @@ -1523,7 +1766,12 @@ client_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1549,7 +1797,13 @@ client_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); } @@ -1558,7 +1812,6 @@ client_handle_receive_close_ack (void *cls, * * @param socket the socket * @param tunnel connection to the other end - * @param tunnel_ctx this is NULL * @param sender who sent the message * @param message the actual message * @param atsi performance data for the connection @@ -1574,9 +1827,20 @@ handle_close (struct GNUNET_STREAM_Socket *socket, { struct GNUNET_STREAM_MessageHeader *close_ack; + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring RECEIVE_CLOSE as it cannot be handled now\n"); + return GNUNET_OK; + default: + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE from %x\n", - socket->our_id, + "Received CLOSE from %x\n", socket->other_peer); close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); @@ -1625,60 +1889,6 @@ client_handle_close (void *cls, } -/** - * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK - * - * @param socket the socket - * @param tunnel connection to the other end - * @param tunnel_ctx this is NULL - * @param sender who sent the message - * @param message the actual message - * @param atsi performance data for the connection - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -static int -handle_close_ack (struct GNUNET_STREAM_Socket *socket, - struct GNUNET_MESH_Tunnel *tunnel, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_STREAM_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) -{ - struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; - - shutdown_handle = socket->shutdown_handle; - switch (socket->state) - { - case STATE_CLOSE_WAIT: - socket->state = STATE_CLOSED; - if ( (NULL == shutdown_handle) || - (SHUT_RDWR != shutdown_handle->operation) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE_ACK when shutdown handle is NULL or " - "not for SHUT_RDWR\n", - socket->our_id); - return GNUNET_OK; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE_ACK from %x\n", - socket->our_id, - socket->other_peer); - if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ - shutdown_handle->completion_cb(shutdown_handle->completion_cls, - SHUT_RDWR); - GNUNET_free (shutdown_handle); /* Free shutdown handle */ - break; - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received CLOSE_ACK when in it not expected\n", - socket->our_id); - break; - } - return GNUNET_OK; -} - - /** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK * @@ -1697,16 +1907,17 @@ client_handle_close_ack (void *cls, void **tunnel_ctx, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + const struct GNUNET_ATS_Information *atsi) { struct GNUNET_STREAM_Socket *socket = cls; - return handle_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) - message, - atsi); + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /*****************************/ @@ -1769,8 +1980,7 @@ server_handle_hello (void *cls, if (GNUNET_PEER_search (sender) != socket->other_peer) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received HELLO from non-confirming peer\n", - socket->our_id); + "Received HELLO from non-confirming peer\n"); return GNUNET_YES; } @@ -1778,8 +1988,7 @@ server_handle_hello (void *cls, ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received HELLO from %x\n", - socket->our_id, + "Received HELLO from %x\n", socket->other_peer); if (STATE_INIT == socket->state) @@ -1831,13 +2040,11 @@ server_handle_hello_ack (void *cls, if (STATE_HELLO_WAIT == socket->state) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received HELLO_ACK from %x\n", - socket->our_id, + "Received HELLO_ACK from %x\n", socket->other_peer); socket->read_sequence_number = ntohl (ack_message->sequence_number); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Read sequence number %u\n", - socket->our_id, + "Read sequence number %u\n", (unsigned int) socket->read_sequence_number); socket->receiver_window_available = ntohl (ack_message->receiver_window_size); @@ -1875,7 +2082,7 @@ server_handle_reset (void *cls, const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; + // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; return GNUNET_OK; } @@ -1933,7 +2140,13 @@ server_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); } @@ -1959,7 +2172,12 @@ server_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1985,7 +2203,13 @@ server_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); } @@ -2042,16 +2266,18 @@ server_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return handle_close_ack (socket, - tunnel, - sender, - (const struct GNUNET_STREAM_MessageHeader *) message, - atsi); + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param socket the socket through which the ack was received * @param tunnel connection to the other end @@ -2075,19 +2301,19 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, if (GNUNET_PEER_search (sender) != socket->other_peer) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received ACK from non-confirming peer\n", - socket->our_id); + "Received ACK from non-confirming peer\n"); return GNUNET_YES; } switch (socket->state) { case (STATE_ESTABLISHED): + case (STATE_RECEIVE_CLOSED): + case (STATE_RECEIVE_CLOSE_WAIT): if (NULL == socket->write_handle) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received DATA_ACK when write_handle is NULL\n", - socket->our_id); + "Received DATA_ACK when write_handle is NULL\n"); return GNUNET_OK; } /* FIXME: increment in the base sequence number is breaking current flow @@ -2096,12 +2322,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received DATA_ACK with unexpected base sequence " - "number\n", - socket->our_id); + "Received DATA_ACK with unexpected base sequence number\n"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Current write sequence: %u; Ack's base sequence: %u\n", - socket->our_id, + "Current write sequence: %u; Ack's base sequence: %u\n", socket->write_sequence_number, ntohl (ack->base_sequence_number)); return GNUNET_OK; @@ -2110,8 +2333,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, acks */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Received DATA_ACK from %x\n", - socket->our_id, + "Received DATA_ACK from %x\n", socket->other_peer); /* Cancel the retransmission task */ @@ -2175,8 +2397,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, socket->status, socket->write_handle->size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Write completion callback completed\n", - socket->our_id); + "Write completion callback completed\n"); /* We are done with the write handle - Freeing it */ GNUNET_free (socket->write_handle); socket->write_handle = NULL; @@ -2190,7 +2411,7 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the 'struct GNUNET_STREAM_Socket' * @param tunnel connection to the other end @@ -2217,7 +2438,7 @@ client_handle_ack (void *cls, /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the server's listen socket * @param tunnel connection to the other end @@ -2322,15 +2543,12 @@ mesh_peer_connect_callback (void *cls, if (connected_peer != socket->other_peer) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: A peer which is not our target has connected", - "to our tunnel\n", - socket->our_id); + "A peer which is not our target has connected to our tunnel\n"); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Target peer %x connected\n", - socket->our_id, + "Target peer %x connected\n", connected_peer); /* Set state to INIT */ @@ -2364,7 +2582,12 @@ static void mesh_peer_disconnect_callback (void *cls, const struct GNUNET_PeerIdentity *peer) { - + struct GNUNET_STREAM_Socket *socket=cls; + + /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Other peer %x disconnected\n", + socket->other_peer); } @@ -2395,12 +2618,9 @@ new_tunnel_notify (void *cls, socket->tunnel = tunnel; socket->session_id = 0; /* FIXME */ socket->state = STATE_INIT; - socket->lsocket = lsocket; - socket->our_id = lsocket->our_id; - + socket->lsocket = lsocket; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Peer %x initiated tunnel to us\n", - socket->our_id, + "Peer %x initiated tunnel to us\n", socket->other_peer); /* FIXME: Copy MESH handle from lsocket to socket */ @@ -2427,10 +2647,13 @@ tunnel_cleaner (void *cls, void *tunnel_ctx) { struct GNUNET_STREAM_Socket *socket = tunnel_ctx; - + + if (tunnel != socket->tunnel) + return; + + GNUNET_break_op(0); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: Peer %x has terminated connection abruptly\n", - socket->our_id, + "Peer %x has terminated connection abruptly\n", socket->other_peer); socket->status = GNUNET_STREAM_SHUTDOWN; @@ -2441,6 +2664,25 @@ tunnel_cleaner (void *cls, GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); socket->transmit_handle = NULL; } + if (NULL != socket->ack_transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } + /* Stop Tasks using socket->tunnel */ + if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } + /* FIXME: Cancel all other tasks using socket->tunnel */ socket->tunnel = NULL; } @@ -2472,8 +2714,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, ...) { struct GNUNET_STREAM_Socket *socket; - struct GNUNET_PeerIdentity own_peer_id; enum GNUNET_STREAM_Option option; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; va_list vargs; /* Variable arguments */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2483,9 +2725,6 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, socket->other_peer = GNUNET_PEER_intern (target); socket->open_cb = open_cb; socket->open_cls = open_cb_cls; - GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id); - socket->our_id = GNUNET_PEER_intern (&own_peer_id); - /* Set defaults */ socket->retransmit_timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); @@ -2509,9 +2748,9 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, 10, /* QUEUE size as parameter? */ socket, /* cls */ NULL, /* No inbound tunnel handler */ - &tunnel_cleaner, /* FIXME: not required? */ + NULL, /* No in-tunnel cleaner */ client_message_handlers, - &app_port); /* We don't get inbound tunnels */ + ports); /* We don't get inbound tunnels */ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ { GNUNET_free (socket); @@ -2573,10 +2812,23 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Existing read handle should be cancelled before shutting" " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + queue_message (socket, + msg, + &set_state_receive_close_wait, + NULL); break; case SHUT_WR: handle->operation = SHUT_WR; - + if (NULL != socket->write_handle) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + queue_message (socket, + msg, + &set_state_transmit_close_wait, + NULL); break; case SHUT_RDWR: handle->operation = SHUT_RDWR; @@ -2598,9 +2850,14 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "GNUNET_STREAM_shutdown called with invalid value for " "parameter operation -- Ignoring\n"); + GNUNET_free (msg); GNUNET_free (handle); return NULL; } + handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + handle); return handle; } @@ -2613,6 +2870,9 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, void GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle) { + if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id) + GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id); + GNUNET_free (handle); return; } @@ -2651,6 +2911,13 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); socket->transmit_handle = NULL; } + if (NULL != socket->ack_transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } /* Clear existing message queue */ while (NULL != (head = socket->queue_head)) { @@ -2703,21 +2970,19 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { /* FIXME: Add variable args for passing configration options? */ struct GNUNET_STREAM_ListenSocket *lsocket; - struct GNUNET_PeerIdentity our_peer_id; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); lsocket->port = app_port; lsocket->listen_cb = listen_cb; lsocket->listen_cb_cls = listen_cb_cls; - GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id); - lsocket->our_id = GNUNET_PEER_intern (&our_peer_id); lsocket->mesh = GNUNET_MESH_connect (cfg, 10, /* FIXME: QUEUE size as parameter? */ lsocket, /* Closure */ &new_tunnel_notify, &tunnel_cleaner, server_message_handlers, - &app_port); + ports); GNUNET_assert (NULL != lsocket->mesh); return lsocket; } @@ -2740,15 +3005,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) /** - * Tries to write the given data to the stream + * Tries to write the given data to the stream. The maximum size of data that + * can be written as part of a write operation is (64 * (64000 - sizeof (struct + * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API + * violation, however only the said number of maximum bytes will be written. * * @param socket the socket representing a stream * @param data the data buffer from where the data is written into the stream * @param size the number of bytes to be written from the data buffer * @param timeout the timeout period - * @param write_cont the function to call upon writing some bytes into the stream + * @param write_cont the function to call upon writing some bytes into the + * stream * @param write_cont_cls the closure - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if a previous write is pending or + * the stream has been shutdown for this operation then write_cont is + * immediately called and NULL is returned. */ struct GNUNET_STREAM_IOWriteHandle * GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, @@ -2776,16 +3048,33 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, GNUNET_break (0); return NULL; } - if (!((STATE_ESTABLISHED == socket->state) - || (STATE_RECEIVE_CLOSE_WAIT == socket->state) - || (STATE_RECEIVE_CLOSED == socket->state))) + + switch (socket->state) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%x: Attempting to write on a closed (OR) not-yet-established" - "stream\n", - socket->our_id); + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + if (NULL != write_cont) + write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); return NULL; - } + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + if (NULL != write_cont) + /* FIXME: GNUNET_STREAM_SYSERR?? */ + write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_ESTABLISHED: + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + break; + } + if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; @@ -2842,14 +3131,18 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, } + /** - * Tries to read data from the stream + * Tries to read data from the stream. * * @param socket the socket representing a stream * @param timeout the timeout period * @param proc function to call with data (once only) * @param proc_cls the closure for proc - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if the stream has been shutdown for + * this type of opeartion then the DataProcessor is immediately + * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned */ struct GNUNET_STREAM_IOReadHandle * GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, @@ -2860,8 +3153,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_IOReadHandle *read_handle; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: %s()\n", - socket->our_id, + "%s()\n", __func__); /* Return NULL if there is already a read handle; the user has to cancel that @@ -2870,6 +3162,21 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, GNUNET_assert (NULL != proc); + switch (socket->state) + { + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", + __func__); + return NULL; + default: + break; + } + read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); read_handle->proc = proc; read_handle->proc_cls = proc_cls; @@ -2890,8 +3197,7 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, &read_io_timeout, socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%x: %s() END\n", - socket->our_id, + "%s() END\n", __func__); return read_handle; }