};
+/**
+ * Functions of this type are called when a message is written
+ *
+ * @param socket the socket the written message was bound to
+ */
+typedef void (*SendFinishCallback) (void *cls,
+ struct GNUNET_STREAM_Socket *socket);
+
+
+/**
+ * The send message queue
+ */
+struct MessageQueue
+{
+ /**
+ * The message
+ */
+ struct GNUNET_STREAM_MessageHeader *message;
+
+ /**
+ * Callback to be called when the message is sent
+ */
+ SendFinishCallback finish_cb;
+
+ /**
+ * The closure for finish_cb
+ */
+ void *finish_cb_cls;
+
+ /**
+ * The next message in queue. Should be NULL in the last message
+ */
+ struct MessageQueue *next;
+};
+
+
/**
* The STREAM Socket Handler
*/
/**
* The current message associated with the transmit handle
*/
- struct GNUNET_MessageHeader *message;
+ struct MessageQueue *queue;
+
+ /**
+ * The queue tail, should always point to the last message in queue
+ */
+ struct MessageQueue *queue_tail;
+
+ /**
+ * The number of previous timeouts
+ */
+ unsigned int retries;
};
};
+
/**
* Default value in seconds for various timeouts
*/
/**
- * Callback function from send_message
+ * Callback function for sending hello message
*
- * @param cls closure the socket on which the send message was called
+ * @param cls closure the socket
* @param size number of bytes available in buf
* @param buf where the callee should write the message
* @return number of bytes written to buf
send_message_notify (void *cls, size_t size, void *buf)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct MessageQueue *head;
size_t ret;
+ head = socket->queue;
socket->transmit_handle = NULL; /* Remove the transmit handle */
if (0 == size) /* request timed out */
{
- // statistics ("message timeout")
-
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Message not sent as tunnel was closed \n");
- ret = 0;
+ socket->retries++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message sending timed out. Retry %d \n",
+ socket->retries);
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ return 0;
+ }
+
+ ret = ntohs (head->message->header.size);
+ GNUNET_assert (size >= ret);
+ memcpy (buf, head->message, ret);
+ if (NULL != head->finish_cb)
+ {
+ head->finish_cb (socket, head->finish_cb_cls);
}
- else /* Size is more or equal to what was requested */
+
+ socket->queue = head->next; /* Will be NULL if the queue is empty */
+ GNUNET_free (head->message);
+ GNUNET_free (head);
+ head = socket->queue;
+ if (NULL != head) /* more pending messages to send */
{
- ret = ntohs (socket->message->size);
- GNUNET_assert (size >= ret);
- memcpy (buf, socket->message, ret);
+ socket->retries = 0;
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
}
- GNUNET_free (socket->message); /* Free the message memory */
- socket->message = NULL;
return ret;
}
/**
- * Sends a message using the mesh connection of a socket
+ * Queues a message for sending using the mesh connection of a socket
*
* @param socket the socket whose mesh connection is used
* @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
*/
static void
-send_message (struct GNUNET_STREAM_Socket *socket,
- struct GNUNET_MessageHeader *message)
+queue_message (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_STREAM_MessageHeader *message,
+ SendFinishCallback finish_cb,
+ void *finish_cb_cls)
{
- socket->message = message;
- socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (message->size),
- &send_message_notify,
- socket);
-}
+ struct MessageQueue *msg_info;
+
+ msg_info = GNUNET_malloc (sizeof (struct MessageQueue));
+ msg_info->message = message;
+ msg_info->finish_cb = finish_cb;
+ msg_info->finish_cb_cls = finish_cb_cls;
+ msg_info->next = NULL;
-/**
- * Makes state transition dependending on the given state
- *
- * @param socket the socket whose state has to be transitioned
- */
-static void
-make_state_transition (struct GNUNET_STREAM_Socket *socket)
-{
+ if (NULL == socket->queue)
+ {
+ socket->queue = msg_info;
+ socket->queue_tail = msg_info;
+ socket->retries = 0;
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (message->header.size),
+ &send_message_notify,
+ socket);
+ }
+ else /* There is a pending message in queue */
+ {
+ socket->queue_tail->next = msg_info; /* Add to tail */
+ socket->queue_tail = msg_info;
+ }
}
+
+
/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
return GNUNET_OK;
}
+
/**
- * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ * Callback to set state to ESTABLISHED
*
- * @param cls the socket (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx this is NULL
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- * GNUNET_SYSERR to close it (signal serious error)
+ * @param cls the closure from queue_message
+ * @param socket the socket to requiring state change
*/
-static int
-client_handle_hello (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information*atsi)
+static void
+set_state_established (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
{
- struct GNUNET_STREAM_Socket *socket = cls;
+ socket->state = STATE_ESTABLISHED;
+}
- return GNUNET_OK;
+
+/**
+ * Callback to set state to HELLO_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket to requiring state change
+ */
+static void
+set_state_hello_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_assert (STATE_INIT == socket->state);
+ socket->state = STATE_HELLO_WAIT;
}
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_MessageHeader *reply;
+
+ GNUNET_assert (socket->tunnel == tunnel);
+ if (STATE_HELLO_WAIT == socket->state)
+ {
+ reply =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ queue_message (socket,
+ reply,
+ &set_state_established,
+ NULL);
+ }
return GNUNET_OK;
}
{&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
{&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
sizeof (struct GNUNET_STREAM_AckMessage) },
- {&client_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
- sizeof (struct GNUNET_STREAM_MessageHeader)},
{&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
sizeof (struct GNUNET_STREAM_MessageHeader)},
{&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
const struct GNUNET_ATS_Information * atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_MessageHeader *message;
if (0 != memcmp (&socket->other_peer,
peer,
/* Set state to INIT */
socket->state = STATE_INIT;
- /* Try to achieve ESTABLISHED state */
- make_state_transition (socket);
+ /* Send HELLO message */
+ message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+ message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket,
+ message,
+ &set_state_hello_wait,
+ NULL);
/* Call open callback */
if (NULL == socket->open_cls)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"STREAM_open callback is NULL\n");
}
if (NULL != socket->open_cb)
}
-/**
- * Function to find the mapped socket of a tunnel
- *
- * @param tunnel the tunnel whose associated socket has to be retrieved
- * @return the socket corresponding to the tunnel
- */
-static struct GNUNET_STREAM_Socket *
-find_socket (const struct GNUNET_MESH_Tunnel *tunnel)
-{
- /* Search tunnel in a list or hashtable and retrieve the socket */
-}
-
/*****************/
/* API functions */
/*****************/
{
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
}
- /* Clear existing message queue message */
- if (NULL != socket->message)
- {
- GNUNET_free (socket->message);
- }
+ /* FIXME: Clear message queue */
/* Close associated tunnel */
if (NULL != socket->tunnel)
{
socket->tunnel = tunnel;
socket->session_id = 0; /* FIXME */
socket->other_peer = *initiator;
- socket->state = STATE_LISTEN;
+ socket->state = STATE_INIT;
if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
socket,
&socket->other_peer))
{
socket->state = STATE_CLOSED;
- make_state_transition (socket);
+ /* FIXME: Send CLOSE message and then free */
GNUNET_free (socket);
GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
}
- else
- {
- make_state_transition (socket);
- }
return socket;
}
{
struct GNUNET_STREAM_ListenSocket *lsocket = cls;
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+ struct MessageQueue *head;
- socket = find_socket (tunnel);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer %s has terminated connection abruptly\n",
GNUNET_i2s (&socket->other_peer));
socket->transmit_handle = NULL;
}
- /* Clear existing message queue message */
- if (NULL != socket->message)
- {
- GNUNET_free (socket->message);
- socket->message = NULL;
- }
+ /* Clear existing message queue */
+ while (NULL != socket->queue) {
+ head = socket->queue;
+ socket->queue = head->next;
+ GNUNET_free (head->message);
+ GNUNET_free (head);
+ }
}
GNUNET_MESH_ApplicationType app_types[2];
app_types[0] = app_port;
- app_types[1] = NULL;
+ app_types[1] = 0;
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;