*/
struct GNUNET_STREAM_Socket *socket;
- /**
- * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
- */
- int operation;
-
/**
* Shutdown completion callback
*/
* Closure for completion callback
*/
void *completion_cls;
+
+ /**
+ * Close message retransmission task id
+ */
+ GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
+
+ /**
+ * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
+ */
+ int operation;
};
}
+/**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+ struct GNUNET_STREAM_MessageHeader *msg;
+ struct GNUNET_STREAM_Socket *socket;
+
+ GNUNET_assert (NULL != shutdown_handle);
+ socket = shutdown_handle->socket;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (shutdown_handle->operation)
+ {
+ case SHUT_RDWR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ break;
+ case SHUT_RD:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ break;
+ case SHUT_WR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ break;
+ default:
+ GNUNET_free (msg);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ return;
+ }
+ queue_message (socket, msg, NULL, NULL);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ shutdown_handle);
+}
+
+
/**
* Function to modify a bit in GNUNET_STREAM_AckBitmap
*
}
+/**
+ * Callback to set state to RECEIVE_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_receive_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Attaing RECEIVE_CLOSE_WAIT state\n",
+ socket->our_id);
+ socket->state = STATE_RECEIVE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to TRANSMIT_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_transmit_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%x: Attaing TRANSMIT_CLOSE_WAIT state\n",
+ socket->our_id);
+ socket->state = STATE_TRANSMIT_CLOSE_WAIT;
+}
+
+
/**
* Callback to set state to CLOSED
*
struct GNUNET_MESH_Tunnel *tunnel,
const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_STREAM_MessageHeader *message,
- const struct GNUNET_ATS_Information*atsi)
+ const struct GNUNET_ATS_Information *atsi)
{
struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
switch (socket->state)
{
case STATE_CLOSE_WAIT:
- socket->state = STATE_CLOSED;
if ( (NULL == shutdown_handle) ||
(SHUT_RDWR != shutdown_handle->operation) )
{
socket->our_id);
return GNUNET_OK;
}
+
+ if (GNUNET_SCHEDULER_NO_TASK
+ != shutdown_handle->close_msg_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel
+ (shutdown_handle->close_msg_retransmission_task_id);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%x: Received CLOSE_ACK from %x\n",
socket->our_id,
socket->other_peer);
+ socket->state = STATE_CLOSED;
if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
shutdown_handle->completion_cb(shutdown_handle->completion_cls,
SHUT_RDWR);
void **tunnel_ctx,
const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information*atsi)
+ const struct GNUNET_ATS_Information *atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Existing read handle should be cancelled before shutting"
" down reading\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_receive_close_wait,
+ NULL);
break;
case SHUT_WR:
handle->operation = SHUT_WR;
-
+ if (NULL != socket->write_handle)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Existing write handle should be cancelled before shutting"
+ " down writing\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_transmit_close_wait,
+ NULL);
break;
case SHUT_RDWR:
handle->operation = SHUT_RDWR;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"GNUNET_STREAM_shutdown called with invalid value for "
"parameter operation -- Ignoring\n");
+ GNUNET_free (msg);
GNUNET_free (handle);
return NULL;
}
+ handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ handle);
return handle;
}
void
GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
{
+ if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
+ GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+ GNUNET_free (handle);
return;
}