*
* Decrement PEER intern count during socket close and listen close to free the
* memory used for PEER interning
+ *
+ * Add code for write io timeout
+ *
+ * Include retransmission for control messages
**/
/**
* @brief Implementation of the stream library
* @author Sree Harsha Totakura
*/
+
+
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
#define MAX_PACKET_SIZE 64000
/**
- * The maximum payload a data message packet can carry
+ * Receive buffer
*/
-static size_t max_payload_size =
- MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+#define RECEIVE_BUFFER_SIZE 4096000
/**
- * Receive buffer
+ * The maximum payload a data message packet can carry
*/
-#define RECEIVE_BUFFER_SIZE 4096000
+static size_t max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
/**
* states in the Protocol
*/
struct GNUNET_MESH_TransmitHandle *transmit_handle;
+ /**
+ * The current act transmit handle (if a pending ack transmit request exists)
+ */
+ struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
+
+ /**
+ * Pointer to the current ack message using in ack_task
+ */
+ struct GNUNET_STREAM_AckMessage *ack_msg;
+
/**
* The current message associated with the transmit handle
*/
*/
struct GNUNET_STREAM_IOReadHandle *read_handle;
+ /**
+ * The shutdown handle associated with this socket
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
/**
* Buffer for storing received messages
*/
/**
* Task identifier for the read io timeout task
*/
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
/**
* Task identifier for retransmission task after timeout
*/
GNUNET_PEER_Id other_peer;
- /**
- * Our Peer Identity (for debugging)
- */
- GNUNET_PEER_Id our_id;
-
/**
* The application port number (type: uint32_t)
*/
*/
void *listen_cb_cls;
- /**
- * Our interned Peer's identity
- */
- GNUNET_PEER_Id our_id;
-
/**
* The service port
* FIXME: Remove if not required!
*/
struct GNUNET_STREAM_IOWriteHandle
{
+ /**
+ * The socket to which this write handle is associated
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
/**
* The packet_buffers associated with this Handle
*/
* Number of bytes in this write handle
*/
size_t size;
-
- /**
- * Number of packets sent before waiting for an ack
- *
- * FIXME: Do we need this?
- */
- unsigned int sent_packets;
};
};
+/**
+ * Handle for Shutdown
+ */
+struct GNUNET_STREAM_ShutdownHandle
+{
+ /**
+ * The socket associated with this shutdown handle
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Shutdown completion callback
+ */
+ GNUNET_STREAM_ShutdownCompletion completion_cb;
+
+ /**
+ * Closure for completion callback
+ */
+ void *completion_cls;
+
+ /**
+ * Close message retransmission task id
+ */
+ GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
+
+ /**
+ * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
+ */
+ int operation;
+};
+
+
/**
* Default value in seconds for various timeouts
*/
static unsigned int default_timeout = 10;
+
/**
- * Callback function for sending hello message
+ * Callback function for sending queued message
*
* @param cls closure the socket
* @param size number of bytes available in buf
&& (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Queueing message of type %d and size %d\n",
- socket->our_id,
+ "Queueing message of type %d and size %d\n",
ntohs (message->header.type),
ntohs (message->header.size));
GNUNET_assert (NULL != message);
static size_t
send_ack_notify (void *cls, size_t size, void *buf)
{
- struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+ struct GNUNET_STREAM_Socket *socket = cls;
if (0 == size)
{
"%s called with size 0\n", __func__);
return 0;
}
- GNUNET_assert (ntohs (ack_msg->header.header.size) <= size);
+ GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
+
+ size = ntohs (socket->ack_msg->header.header.size);
+ memcpy (buf, socket->ack_msg, size);
- size = ntohs (ack_msg->header.header.size);
- memcpy (buf, ack_msg, size);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
return size;
}
return;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Retransmitting DATA...\n", socket->our_id);
+ "Retransmitting DATA...\n");
socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
write_data (socket);
}
return;
}
- socket->ack_task_id = 0;
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Create the ACK Message */
ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
ack_msg->receive_window_remaining =
htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
+ socket->ack_msg = ack_msg;
GNUNET_PEER_resolve (socket->other_peer, &target);
/* Request MESH for sending ACK */
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- socket->retransmit_timeout,
- &target,
- ntohs (ack_msg->header.header.size),
- &send_ack_notify,
- ack_msg);
+ socket->ack_transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &target,
+ ntohs (ack_msg->header.header.size),
+ &send_ack_notify,
+ socket);
+}
-
+
+/**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+ struct GNUNET_STREAM_MessageHeader *msg;
+ struct GNUNET_STREAM_Socket *socket;
+
+ GNUNET_assert (NULL != shutdown_handle);
+ socket = shutdown_handle->socket;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (shutdown_handle->operation)
+ {
+ case SHUT_RDWR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ break;
+ case SHUT_RD:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ break;
+ case SHUT_WR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ break;
+ default:
+ GNUNET_free (msg);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ return;
+ }
+ queue_message (socket, msg, NULL, NULL);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ shutdown_handle);
}
}
-
-/**
- * Function called when Data Message is sent
- *
- * @param cls the io_handle corresponding to the Data Message
- * @param socket the socket which was used
- */
-static void
-write_data_finish_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket)
-{
- struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
-
- io_handle->sent_packets++;
-}
-
-
/**
* Writes data using the given socket. The amount of data written is limited by
* the receiver_window_size
packet))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Placing DATA message with sequence %u in send queue\n",
- socket->our_id,
+ "Placing DATA message with sequence %u in send queue\n",
ntohl (io_handle->messages[packet]->sequence_number));
copy_and_queue_message (socket,
packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
while ( (NULL != io_handle->messages[packet]) &&
- (socket->receiver_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)) )
{
socket->receiver_window_available -=
ntohs (io_handle->messages[packet]->header.header.size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Placing DATA message with sequence %u in send queue\n",
- socket->our_id,
+ "Placing DATA message with sequence %u in send queue\n",
ntohl (io_handle->messages[packet]->sequence_number));
copy_and_queue_message (socket,
&io_handle->messages[packet]->header,
- &write_data_finish_cb,
- io_handle);
+ NULL,
+ NULL);
packet++;
}
if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
socket->retransmission_timeout_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
+ (GNUNET_TIME_UNIT_SECONDS, 8),
&retransmission_timeout_task,
socket);
}
*/
static void
call_read_processor (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
size_t read_size;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ if (NULL == socket->receive_buffer)
+ return;
+
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (NULL != socket->read_handle->proc);
GNUNET_assert (0 != valid_read_size);
/* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Calling read processor\n",
- socket->our_id);
+ "Calling read processor\n");
read_size =
socket->read_handle->proc (socket->read_handle->proc_cls,
socket->status,
socket->receive_buffer + socket->copy_offset,
valid_read_size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Read processor read %d bytes\n",
- socket->our_id,
+ "Read processor read %d bytes\n",
read_size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Read processor completed successfully\n",
- socket->our_id);
+ "Read processor completed successfully\n");
/* Free the read handle */
GNUNET_free (socket->read_handle);
/* Determine upto which packet we can remove from the buffer */
for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
- break;
+ {
+ if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
+ { packet++; break; }
+ if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
+ break;
+ }
/* If no packets can be removed we can't move the buffer */
if (0 == packet) return;
sequence_increase = packet;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sequence increase after read processor completion: %u\n",
+ sequence_increase);
/* Shift the data in the receive buffer */
memmove (socket->receive_buffer,
socket->receive_buffer
+ socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
/* Shift the bitmap */
socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
GNUNET_STREAM_DataProcessor proc;
void *proc_cls;
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Read task timedout - Cancelling it\n",
- socket->our_id);
+ "Read task timedout - Cancelling it\n");
GNUNET_SCHEDULER_cancel (socket->read_task_id);
socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
if (GNUNET_PEER_search (sender) != socket->other_peer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received DATA from non-confirming peer\n",
- socket->our_id);
+ "Received DATA from non-confirming peer\n");
return GNUNET_YES;
}
if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Ignoring received message with sequence number %u\n",
- socket->our_id,
+ "Ignoring received message with sequence number %u\n",
ntohl (msg->sequence_number));
/* Start ACK sending task if one is not already present */
if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
relative_sequence_number))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Ignoring already received message with sequence "
+ "Ignoring already received message with sequence "
"number %u\n",
- socket->our_id,
ntohl (msg->sequence_number));
/* Start ACK sending task if one is not already present */
if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Receiving DATA with sequence number: %u and size: %d "
- "from %x\n",
- socket->our_id,
+ "Receiving DATA with sequence number: %u and size: %d from %x\n",
ntohl (msg->sequence_number),
ntohs (msg->header.header.size),
socket->other_peer);
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Cannot accommodate packet %d as buffer is",
- "full\n",
- socket->our_id,
+ "Cannot accommodate packet %d as buffer is full\n",
ntohl (msg->sequence_number));
return GNUNET_YES;
}
0)))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Scheduling read processor\n",
- socket->our_id);
+ "Scheduling read processor\n");
socket->read_task_id =
GNUNET_SCHEDULER_add_now (&call_read_processor,
default:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received data message when it cannot be handled\n",
- socket->our_id);
+ "Received data message when it cannot be handled\n");
break;
}
return GNUNET_YES;
}
+
/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
struct GNUNET_PeerIdentity initiator_pid;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Attaining ESTABLISHED state\n",
- socket->our_id);
+ "Attaining ESTABLISHED state\n");
socket->write_offset = 0;
socket->read_offset = 0;
socket->state = STATE_ESTABLISHED;
{
GNUNET_PEER_resolve (socket->other_peer, &initiator_pid);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Calling listen callback\n",
- socket->our_id);
+ "Calling listen callback\n");
if (GNUNET_SYSERR ==
socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
socket,
{
GNUNET_assert (STATE_INIT == socket->state);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Attaining HELLO_WAIT state\n",
- socket->our_id);
+ "Attaining HELLO_WAIT state\n");
socket->state = STATE_HELLO_WAIT;
}
+/**
+ * Callback to set state to CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Attaing CLOSE_WAIT state\n");
+ socket->state = STATE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to RECEIVE_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_receive_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Attaing RECEIVE_CLOSE_WAIT state\n");
+ socket->state = STATE_RECEIVE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to TRANSMIT_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_transmit_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Attaining TRANSMIT_CLOSE_WAIT state\n");
+ socket->state = STATE_TRANSMIT_CLOSE_WAIT;
+}
+
+
+/**
+ * Callback to set state to CLOSED
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_closed (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ socket->state = STATE_CLOSED;
+}
+
+/**
+ * Returns a new HelloAckMessage. Also sets the write sequence number for the
+ * socket
+ *
+ * @param socket the socket for which this HelloAckMessage has to be generated
+ * @return the HelloAckMessage
+ */
+static struct GNUNET_STREAM_HelloAckMessage *
+generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_HelloAckMessage *msg;
+
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Generated write sequence number %u\n",
+ (unsigned int) socket->write_sequence_number);
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ msg->header.header.size =
+ htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ msg->sequence_number = htonl (socket->write_sequence_number);
+ msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
+
+ return msg;
+}
+
+
/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
*
if (GNUNET_PEER_search (sender) != socket->other_peer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received HELLO_ACK from non-confirming peer\n",
- socket->our_id);
+ "Received HELLO_ACK from non-confirming peer\n");
return GNUNET_YES;
}
ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received HELLO_ACK from %x\n",
- socket->our_id,
+ "Received HELLO_ACK from %x\n",
socket->other_peer);
GNUNET_assert (socket->tunnel == tunnel);
case STATE_HELLO_WAIT:
socket->read_sequence_number = ntohl (ack_msg->sequence_number);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Read sequence number %u\n",
- socket->our_id,
+ "Read sequence number %u\n",
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
- /* Get the random sequence number */
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Generated write sequence number %u\n",
- socket->our_id,
- (unsigned int) socket->write_sequence_number);
- reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.size =
- htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
- reply->sequence_number = htonl (socket->write_sequence_number);
- reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
- queue_message (socket,
+ reply = generate_hello_ack_msg (socket);
+ queue_message (socket,
&reply->header,
&set_state_established,
NULL);
case STATE_INIT:
default:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Server %x sent HELLO_ACK when in state %d\n",
- socket->our_id,
+ "Server %x sent HELLO_ACK when in state %d\n",
socket->other_peer,
socket->state);
socket->state = STATE_CLOSED; // introduce STATE_ERROR?
const struct GNUNET_MessageHeader *message,
const struct GNUNET_ATS_Information*atsi)
{
- struct GNUNET_STREAM_Socket *socket = cls;
+ // struct GNUNET_STREAM_Socket *socket = cls;
return GNUNET_OK;
}
}
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @param operation the close operation which is being ACK'ed
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi,
+ int operation)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ shutdown_handle = socket->shutdown_handle;
+ if (NULL == shutdown_handle)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received *CLOSE_ACK when shutdown handle is NULL\n");
+ return GNUNET_OK;
+ }
+
+ switch (operation)
+ {
+ case SHUT_RDWR:
+ switch (socket->state)
+ {
+ case STATE_CLOSE_WAIT:
+ if (SHUT_RDWR != shutdown_handle->operation)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CLOSE_ACK when shutdown handle is not for SHUT_RDWR\n");
+ return GNUNET_OK;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CLOSE_ACK from %x\n",
+ socket->other_peer);
+ socket->state = STATE_CLOSED;
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CLOSE_ACK when in it not expected\n");
+ return GNUNET_OK;
+ }
+ break;
+
+ case SHUT_RD:
+ switch (socket->state)
+ {
+ case STATE_RECEIVE_CLOSE_WAIT:
+ if (SHUT_RD != shutdown_handle->operation)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received RECEIVE_CLOSE_ACK when shutdown handle is not for SHUT_RD\n");
+ return GNUNET_OK;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received RECEIVE_CLOSE_ACK from %x\n",
+ socket->other_peer);
+ socket->state = STATE_RECEIVE_CLOSED;
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received RECEIVE_CLOSE_ACK when in it not expected\n");
+ return GNUNET_OK;
+ }
+
+ break;
+ case SHUT_WR:
+ switch (socket->state)
+ {
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ if (SHUT_WR != shutdown_handle->operation)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received TRANSMIT_CLOSE_ACK when shutdown handle is not for SHUT_WR\n");
+ return GNUNET_OK;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received TRANSMIT_CLOSE_ACK from %x\n",
+ socket->other_peer);
+ socket->state = STATE_TRANSMIT_CLOSED;
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received TRANSMIT_CLOSE_ACK when in it not expected\n");
+
+ return GNUNET_OK;
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+
+ if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
+ shutdown_handle->completion_cb(shutdown_handle->completion_cls,
+ operation);
+ GNUNET_free (shutdown_handle); /* Free shutdown handle */
+ socket->shutdown_handle = NULL;
+ if (GNUNET_SCHEDULER_NO_TASK
+ != shutdown_handle->close_msg_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel
+ (shutdown_handle->close_msg_retransmission_task_id);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ }
+ return GNUNET_OK;
+}
+
+
/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
*
{
struct GNUNET_STREAM_Socket *socket = cls;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_WR);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_receive_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *receive_close_ack;
+
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
+ return GNUNET_OK;
+ default:
+ break;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received RECEIVE_CLOSE from %x\n",
+ socket->other_peer);
+ receive_close_ack =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ receive_close_ack->header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ receive_close_ack->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
+ queue_message (socket,
+ receive_close_ack,
+ &set_state_closed,
+ NULL);
+
+ /* FIXME: Handle the case where write handle is present; the write operation
+ should be deemed as finised and the write continuation callback
+ has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
return GNUNET_OK;
}
{
struct GNUNET_STREAM_Socket *socket = cls;
- return GNUNET_OK;
+ return
+ handle_receive_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
{
struct GNUNET_STREAM_Socket *socket = cls;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RD);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *close_ack;
+
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring RECEIVE_CLOSE as it cannot be handled now\n");
+ return GNUNET_OK;
+ default:
+ break;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received CLOSE from %x\n",
+ socket->other_peer);
+ close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
+ queue_message (socket,
+ close_ack,
+ &set_state_closed,
+ NULL);
+ if (socket->state == STATE_CLOSED)
+ return GNUNET_OK;
+
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
return GNUNET_OK;
}
{
struct GNUNET_STREAM_Socket *socket = cls;
- return GNUNET_OK;
+ return handle_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
void **tunnel_ctx,
const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information*atsi)
+ const struct GNUNET_ATS_Information *atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RDWR);
}
/*****************************/
if (GNUNET_PEER_search (sender) != socket->other_peer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received HELLO from non-confirming peer\n",
- socket->our_id);
+ "Received HELLO from non-confirming peer\n");
return GNUNET_YES;
}
ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received HELLO from %x\n",
- socket->our_id,
+ "Received HELLO from %x\n",
socket->other_peer);
if (STATE_INIT == socket->state)
{
- /* Get the random sequence number */
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Generated write sequence number %u\n",
- socket->our_id,
- (unsigned int) socket->write_sequence_number);
- reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.size =
- htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
- reply->sequence_number = htonl (socket->write_sequence_number);
- reply->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
+ reply = generate_hello_ack_msg (socket);
queue_message (socket,
&reply->header,
&set_state_hello_wait,
if (STATE_HELLO_WAIT == socket->state)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received HELLO_ACK from %x\n",
- socket->our_id,
+ "Received HELLO_ACK from %x\n",
socket->other_peer);
socket->read_sequence_number = ntohl (ack_message->sequence_number);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Read sequence number %u\n",
- socket->our_id,
+ "Read sequence number %u\n",
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available =
ntohl (ack_message->receiver_window_size);
const struct GNUNET_MessageHeader *message,
const struct GNUNET_ATS_Information*atsi)
{
- struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
return GNUNET_OK;
}
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_WR);
}
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return
+ handle_receive_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RD);
}
/**
* Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
*
- * @param cls the closure
+ * @param cls the listen socket (from GNUNET_MESH_connect in
+ * GNUNET_STREAM_listen)
* @param tunnel connection to the other end
* @param tunnel_ctx the socket
* @param sender who sent the message
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
-
- return GNUNET_OK;
+
+ return handle_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RDWR);
}
/**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
*
* @param socket the socket through which the ack was received
* @param tunnel connection to the other end
{
unsigned int packet;
int need_retransmission;
+
if (GNUNET_PEER_search (sender) != socket->other_peer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received ACK from non-confirming peer\n",
- socket->our_id);
+ "Received ACK from non-confirming peer\n");
return GNUNET_YES;
}
switch (socket->state)
{
case (STATE_ESTABLISHED):
+ case (STATE_RECEIVE_CLOSED):
+ case (STATE_RECEIVE_CLOSE_WAIT):
if (NULL == socket->write_handle)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received DATA_ACK when write_handle is NULL\n",
- socket->our_id);
+ "Received DATA_ACK when write_handle is NULL\n");
return GNUNET_OK;
}
/* FIXME: increment in the base sequence number is breaking current flow
*/
if (!((socket->write_sequence_number
- - htonl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received DATA_ACK with unexpected base sequence "
- "number\n",
- socket->our_id);
+ "Received DATA_ACK with unexpected base sequence number\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Current write sequence: %u; Ack's base sequence: %u\n",
+ socket->write_sequence_number,
+ ntohl (ack->base_sequence_number));
return GNUNET_OK;
}
/* FIXME: include the case when write_handle is cancelled - ignore the
acks */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Received DATA_ACK from %x\n",
- socket->our_id,
+ "Received DATA_ACK from %x\n",
socket->other_peer);
/* Cancel the retransmission task */
GNUNET_SCHEDULER_NO_TASK;
}
- /* FIXME: Bits in the ack_bitmap are only to be set; Once set they cannot
- be unset */
- socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == socket->write_handle->messages[packet]) break;
+ if (ntohl (ack->base_sequence_number)
+ >= ntohl (socket->write_handle->messages[packet]->sequence_number))
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet,
+ GNUNET_YES);
+ else
+ if (GNUNET_YES ==
+ ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
+ ntohl (socket->write_handle->messages[packet]->sequence_number)
+ - ntohl (ack->base_sequence_number)))
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet,
+ GNUNET_YES);
+ }
+
+ /* Update the receive window remaining
+ FIXME : Should update with the value from a data ack with greater
+ sequence number */
socket->receiver_window_available =
ntohl (ack->receive_window_remaining);
socket->status,
socket->write_handle->size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Write completion callback completed\n",
- socket->our_id);
+ "Write completion callback completed\n");
/* We are done with the write handle - Freeing it */
GNUNET_free (socket->write_handle);
socket->write_handle = NULL;
/**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
*
* @param cls the 'struct GNUNET_STREAM_Socket'
* @param tunnel connection to the other end
/**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
*
* @param cls the server's listen socket
* @param tunnel connection to the other end
if (connected_peer != socket->other_peer)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: A peer which is not our target has connected",
- "to our tunnel\n",
- socket->our_id);
+ "A peer which is not our target has connected to our tunnel\n");
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Target peer %x connected\n",
- socket->our_id,
+ "Target peer %x connected\n",
connected_peer);
/* Set state to INIT */
mesh_peer_disconnect_callback (void *cls,
const struct GNUNET_PeerIdentity *peer)
{
-
+ struct GNUNET_STREAM_Socket *socket=cls;
+
+ /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Other peer %x disconnected\n",
+ socket->other_peer);
}
socket->tunnel = tunnel;
socket->session_id = 0; /* FIXME */
socket->state = STATE_INIT;
- socket->lsocket = lsocket;
- socket->our_id = lsocket->our_id;
-
+ socket->lsocket = lsocket;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Peer %x initiated tunnel to us\n",
- socket->our_id,
+ "Peer %x initiated tunnel to us\n",
socket->other_peer);
/* FIXME: Copy MESH handle from lsocket to socket */
void *tunnel_ctx)
{
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-
+
+ if (tunnel != socket->tunnel)
+ return;
+
+ GNUNET_break_op(0);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: Peer %x has terminated connection abruptly\n",
- socket->our_id,
+ "Peer %x has terminated connection abruptly\n",
socket->other_peer);
socket->status = GNUNET_STREAM_SHUTDOWN;
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
socket->transmit_handle = NULL;
}
+ if (NULL != socket->ack_transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
+ }
+ /* Stop Tasks using socket->tunnel */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* FIXME: Cancel all other tasks using socket->tunnel */
socket->tunnel = NULL;
}
...)
{
struct GNUNET_STREAM_Socket *socket;
- struct GNUNET_PeerIdentity own_peer_id;
enum GNUNET_STREAM_Option option;
+ GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
va_list vargs; /* Variable arguments */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
socket->other_peer = GNUNET_PEER_intern (target);
socket->open_cb = open_cb;
socket->open_cls = open_cb_cls;
- GNUNET_TESTING_get_peer_identity (cfg, &own_peer_id);
- socket->our_id = GNUNET_PEER_intern (&own_peer_id);
-
/* Set defaults */
socket->retransmit_timeout =
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
10, /* QUEUE size as parameter? */
socket, /* cls */
NULL, /* No inbound tunnel handler */
- &tunnel_cleaner, /* FIXME: not required? */
+ NULL, /* No in-tunnel cleaner */
client_message_handlers,
- &app_port); /* We don't get inbound tunnels */
+ ports); /* We don't get inbound tunnels */
if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
{
GNUNET_free (socket);
/**
- * Shutdown the stream for reading or writing (man 2 shutdown).
+ * Shutdown the stream for reading or writing (similar to man 2 shutdown).
*
* @param socket the stream socket
- * @param how SHUT_RD, SHUT_WR or SHUT_RDWR
+ * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
+ * @param completion_cb the callback that will be called upon successful
+ * shutdown of given operation
+ * @param completion_cls the closure for the completion callback
+ * @return the shutdown handle
*/
-void
+struct GNUNET_STREAM_ShutdownHandle *
GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
- int how)
+ int operation,
+ GNUNET_STREAM_ShutdownCompletion completion_cb,
+ void *completion_cls)
+{
+ struct GNUNET_STREAM_ShutdownHandle *handle;
+ struct GNUNET_STREAM_MessageHeader *msg;
+
+ GNUNET_assert (NULL == socket->shutdown_handle);
+
+ handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
+ handle->socket = socket;
+ handle->completion_cb = completion_cb;
+ handle->completion_cls = completion_cls;
+ socket->shutdown_handle = handle;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (operation)
+ {
+ case SHUT_RD:
+ handle->operation = SHUT_RD;
+ if (NULL != socket->read_handle)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Existing read handle should be cancelled before shutting"
+ " down reading\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_receive_close_wait,
+ NULL);
+ break;
+ case SHUT_WR:
+ handle->operation = SHUT_WR;
+ if (NULL != socket->write_handle)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Existing write handle should be cancelled before shutting"
+ " down writing\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_transmit_close_wait,
+ NULL);
+ break;
+ case SHUT_RDWR:
+ handle->operation = SHUT_RDWR;
+ if (NULL != socket->write_handle)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Existing write handle should be cancelled before shutting"
+ " down writing\n");
+ if (NULL != socket->read_handle)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Existing read handle should be cancelled before shutting"
+ " down reading\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_close_wait,
+ NULL);
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "GNUNET_STREAM_shutdown called with invalid value for "
+ "parameter operation -- Ignoring\n");
+ GNUNET_free (msg);
+ GNUNET_free (handle);
+ return NULL;
+ }
+ handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ handle);
+ return handle;
+}
+
+
+/**
+ * Cancels a pending shutdown
+ *
+ * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
+ */
+void
+GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
{
+ if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
+ GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+ GNUNET_free (handle);
return;
}
{
struct MessageQueue *head;
+ GNUNET_break (NULL == socket->read_handle);
+ GNUNET_break (NULL == socket->write_handle);
+
if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
- {
- /* socket closed with read task pending!? */
- GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
+ {
+ /* socket closed with read task pending!? */
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_cancel (socket->read_task_id);
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ /* Terminate the ack'ing tasks if they are still present */
+ if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
socket->transmit_handle = NULL;
}
+ if (NULL != socket->ack_transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
+ }
/* Clear existing message queue */
while (NULL != (head = socket->queue_head)) {
{
/* FIXME: Add variable args for passing configration options? */
struct GNUNET_STREAM_ListenSocket *lsocket;
- struct GNUNET_PeerIdentity our_peer_id;
+ GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
lsocket->listen_cb_cls = listen_cb_cls;
- GNUNET_TESTING_get_peer_identity (cfg, &our_peer_id);
- lsocket->our_id = GNUNET_PEER_intern (&our_peer_id);
lsocket->mesh = GNUNET_MESH_connect (cfg,
10, /* FIXME: QUEUE size as parameter? */
lsocket, /* Closure */
&new_tunnel_notify,
&tunnel_cleaner,
server_message_handlers,
- &app_port);
+ ports);
GNUNET_assert (NULL != lsocket->mesh);
return lsocket;
}
/**
- * Tries to write the given data to the stream
+ * Tries to write the given data to the stream. The maximum size of data that
+ * can be written as part of a write operation is (64 * (64000 - sizeof (struct
+ * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
+ * violation, however only the said number of maximum bytes will be written.
*
* @param socket the socket representing a stream
* @param data the data buffer from where the data is written into the stream
* @param size the number of bytes to be written from the data buffer
* @param timeout the timeout period
- * @param write_cont the function to call upon writing some bytes into the stream
+ * @param write_cont the function to call upon writing some bytes into the
+ * stream
* @param write_cont_cls the closure
- * @return handle to cancel the operation
+ *
+ * @return handle to cancel the operation; if a previous write is pending or
+ * the stream has been shutdown for this operation then write_cont is
+ * immediately called and NULL is returned.
*/
struct GNUNET_STREAM_IOWriteHandle *
GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
GNUNET_break (0);
return NULL;
}
- if (!((STATE_ESTABLISHED == socket->state)
- || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
- || (STATE_RECEIVE_CLOSED == socket->state)))
+
+ switch (socket->state)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%x: Attempting to write on a closed (OR) not-yet-established"
- "stream\n",
- socket->our_id);
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ case STATE_CLOSED:
+ case STATE_CLOSE_WAIT:
+ if (NULL != write_cont)
+ write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n", __func__);
return NULL;
- }
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ if (NULL != write_cont)
+ /* FIXME: GNUNET_STREAM_SYSERR?? */
+ write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n", __func__);
+ return NULL;
+ case STATE_ESTABLISHED:
+ case STATE_RECEIVE_CLOSED:
+ case STATE_RECEIVE_CLOSE_WAIT:
+ break;
+ }
+
if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+ io_handle->socket = socket;
io_handle->write_cont = write_cont;
io_handle->write_cont_cls = write_cont_cls;
io_handle->size = size;
}
+
/**
- * Tries to read data from the stream
+ * Tries to read data from the stream.
*
* @param socket the socket representing a stream
* @param timeout the timeout period
* @param proc function to call with data (once only)
* @param proc_cls the closure for proc
- * @return handle to cancel the operation
+ *
+ * @return handle to cancel the operation; if the stream has been shutdown for
+ * this type of opeartion then the DataProcessor is immediately
+ * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
*/
struct GNUNET_STREAM_IOReadHandle *
GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_STREAM_IOReadHandle *read_handle;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: %s()\n",
- socket->our_id,
+ "%s()\n",
__func__);
/* Return NULL if there is already a read handle; the user has to cancel that
GNUNET_assert (NULL != proc);
+ switch (socket->state)
+ {
+ case STATE_RECEIVE_CLOSED:
+ case STATE_RECEIVE_CLOSE_WAIT:
+ case STATE_CLOSED:
+ case STATE_CLOSE_WAIT:
+ proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n",
+ __func__);
+ return NULL;
+ default:
+ break;
+ }
+
read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
read_handle->proc = proc;
read_handle->proc_cls = proc_cls;
}
/* Setup the read timeout task */
- socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
- &read_io_timeout,
- socket);
+ socket->read_io_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (timeout,
+ &read_io_timeout,
+ socket);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%x: %s() END\n",
- socket->our_id,
+ "%s() END\n",
__func__);
return read_handle;
}
void
GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
{
- /* FIXME: Should cancel the write retransmission task */
+ struct GNUNET_STREAM_Socket *socket = ioh->socket;
+ unsigned int packet;
+
+ GNUNET_assert (NULL != socket->write_handle);
+ GNUNET_assert (socket->write_handle == ioh);
+
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == ioh->messages[packet]) break;
+ GNUNET_free (ioh->messages[packet]);
+ }
+
+ GNUNET_free (socket->write_handle);
+ socket->write_handle = NULL;
return;
}