#include "transport.h"
#define MAX_PROBES 20
+#define MAX_RETRIES 3
+#define RETRY 0
#define LOG(kind,...) GNUNET_log_from (kind, "transport-unix",__VA_ARGS__)
struct UNIXMessage * msg;
size_t msgsize;
+ size_t payload;
- struct GNUNET_TIME_Relative timeout;
+ struct GNUNET_TIME_Absolute timeout;
unsigned int priority;
struct Session *session;
continue;
GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
if (NULL != msgw->cont)
- msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR);
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR,
+ msgw->payload, 0);
GNUNET_free (msgw->msg);
GNUNET_free (msgw);
removed = GNUNET_YES;
unix_transport_server_stop (void *cls)
{
struct Plugin *plugin = cls;
-
- struct UNIXMessageWrapper * msgw = plugin->msg_head;
+ struct UNIXMessageWrapper * msgw;
while (NULL != (msgw = plugin->msg_head))
{
GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
if (msgw->cont != NULL)
- msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR);
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR,
+ msgw->payload, 0);
GNUNET_free (msgw->msg);
GNUNET_free (msgw);
}
* @param timeout when should we time out (give up) if we can not transmit?
* @param addr the addr to send the message to, needs to be a sockaddr for us
* @param addrlen the len of addr
+ * @param payload bytes payload to send
* @param cont continuation to call once the message has
* been transmitted (or if the transport is ready
* for the next transmission call; or if the
* peer disconnected...)
* @param cont_cls closure for cont
*
- * @return the number of bytes written, -1 on errors
+ * @return on success : the number of bytes written, 0 n retry, -1 on errors
*/
static ssize_t
unix_real_send (void *cls,
struct GNUNET_NETWORK_Handle *send_handle,
const struct GNUNET_PeerIdentity *target, const char *msgbuf,
size_t msgbuf_size, unsigned int priority,
- struct GNUNET_TIME_Relative timeout, const void *addr,
- size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont,
+ struct GNUNET_TIME_Absolute timeout,
+ const void *addr,
+ size_t addrlen,
+ size_t payload,
+ GNUNET_TRANSPORT_TransmitContinuation cont,
void *cont_cls)
{
struct Plugin *plugin = cls;
if (send_handle == NULL)
{
- /* We do not have a send handle */
- GNUNET_break (0);
- if (cont != NULL)
- cont (cont_cls, target, GNUNET_SYSERR);
- return -1;
+ GNUNET_break (0); /* We do not have a send handle */
+ return GNUNET_SYSERR;
}
if ((addr == NULL) || (addrlen == 0))
{
- /* Can never send if we don't have an address */
- GNUNET_break (0);
- if (cont != NULL)
- cont (cont_cls, target, GNUNET_SYSERR);
- return -1;
+ GNUNET_break (0); /* Can never send if we don't have an address */
+ return GNUNET_SYSERR;
}
/* Prepare address */
sb = (struct sockaddr *) &un;
sbs = slen;
+resend:
/* Send the data */
sent = 0;
sent = GNUNET_NETWORK_socket_sendto (send_handle, msgbuf, msgbuf_size, sb, sbs);
- if ((GNUNET_SYSERR == sent) && ((errno == EAGAIN) || (errno == ENOBUFS)))
- {
- /* We have to retry later: retry */
- return 0;
- }
-
- if ((GNUNET_SYSERR == sent) && (errno == EMSGSIZE))
+ if (GNUNET_SYSERR == sent)
{
- socklen_t size = 0;
- socklen_t len = sizeof (size);
-
- GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *)
- send_handle, SOL_SOCKET, SO_SNDBUF, &size,
- &len);
-
- if (size < msgbuf_size)
+ if (errno == EAGAIN)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Trying to increase socket buffer size from %i to %i for message size %i\n",
- size,
- ((msgbuf_size / 1000) + 2) * 1000,
- msgbuf_size);
- size = ((msgbuf_size / 1000) + 2) * 1000;
- if (GNUNET_NETWORK_socket_setsockopt
- ((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF,
- &size, sizeof (size)) == GNUNET_OK)
+ return RETRY; /* We have to retry later */
+ }
+ if (errno == ENOBUFS)
+ {
+ return RETRY; /* We have to retry later */
+ }
+ if (errno == EMSGSIZE)
+ {
+ socklen_t size = 0;
+ socklen_t len = sizeof (size);
+
+ GNUNET_NETWORK_socket_getsockopt ((struct GNUNET_NETWORK_Handle *)
+ send_handle, SOL_SOCKET, SO_SNDBUF, &size,
+ &len);
+ if (size < msgbuf_size)
{
- /* Increased buffer size, retry sending */
- return 0;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Trying to increase socket buffer size from %i to %i for message size %i\n",
+ size, ((msgbuf_size / 1000) + 2) * 1000, msgbuf_size);
+ size = ((msgbuf_size / 1000) + 2) * 1000;
+ if (GNUNET_OK == GNUNET_NETWORK_socket_setsockopt
+ ((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF,
+ &size, sizeof (size)))
+ goto resend; /* Increased buffer size, retry sending */
+ else
+ {
+ /* Could not increase buffer size: error, no retry */
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
+ return GNUNET_SYSERR;
+ }
}
else
{
- /* Could not increase buffer size: error, no retry */
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "setsockopt");
- return -1;
+ /* Buffer is bigger than message: error, no retry
+ * This should never happen!*/
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
}
}
- else
- {
- /* Buffer is bigger than message: error, no retry
- * This should never happen!*/
- GNUNET_break (0);
- return -1;
- }
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"UNIX transmit %u-byte message to %s (%d: %s)\n",
(unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent,
(sent < 0) ? STRERROR (errno) : "ok");
-
- /* Calling continuation */
- if (cont != NULL)
- {
- if (sent == GNUNET_SYSERR)
- cont (cont_cls, target, GNUNET_SYSERR);
- if (sent > 0)
- cont (cont_cls, target, GNUNET_OK);
- }
-
- /* return number of bytes successfully sent */
- if (sent > 0)
- return sent;
- if (sent == 0)
- {
- /* That should never happen */
- GNUNET_break (0);
- return -1;
- }
- /* failed and retry: return 0 */
- if (GNUNET_SYSERR == sent)
- return 0;
- /* default */
- return -1;
+ return sent;
}
struct gsi_ctx
return GNUNET_SYSERR;
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending %u bytes with session for peer `%s' `%s'\n",
- msgbuf_size,
+ msgbuf_size,
GNUNET_i2s (&session->target),
(char *) session->addr);
wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper));
wrapper->msg = message;
wrapper->msgsize = ssize;
+ wrapper->payload = msgbuf_size;
wrapper->priority = priority;
- wrapper->timeout = to;
+ wrapper->timeout = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), to);
wrapper->cont = cont;
wrapper->cont_cls = cont_cls;
wrapper->session = session;
GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
plugin->bytes_in_queue += ssize;
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes in send queue",
+ GNUNET_STATISTICS_set (plugin->env->stats,"# bytes currently in UNIX buffers",
plugin->bytes_in_queue, GNUNET_NO);
LOG (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", ssize,
LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from %s\n",
un->sun_path);
+
+
plugin->bytes_in_recv += ntohs(currhdr->size);
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes received",
+ GNUNET_STATISTICS_set (plugin->env->stats,"# bytes received via UNIX",
plugin->bytes_in_recv, GNUNET_NO);
addr = GNUNET_HELLO_address_allocate(sender, "unix", un->sun_path, strlen (un->sun_path) + 1);
static void
unix_plugin_select_write (struct Plugin * plugin)
{
- static int retry_counter;
int sent = 0;
- struct UNIXMessageWrapper * msgw = plugin->msg_head;
+
+ struct UNIXMessageWrapper * msgw = plugin->msg_tail;
+ while (NULL != msgw)
+ {
+ if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value > 0)
+ break; /* Message is ready for sending */
+ else
+ {
+ /* Message has a timeout */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout for message with %llu bytes \n", msgw->msgsize);
+ GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
+ if (NULL != msgw->cont)
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, msgw->payload, 0);
+
+ plugin->bytes_in_queue -= msgw->msgsize;
+ GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers",
+ plugin->bytes_in_queue, GNUNET_NO);
+
+ plugin->bytes_discarded += msgw->msgsize;
+ GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded",
+ plugin->bytes_discarded, GNUNET_NO);
+
+ GNUNET_free (msgw->msg);
+ GNUNET_free (msgw);
+ }
+ msgw = plugin->msg_tail;
+ }
+ if (NULL == msgw)
+ return; /* Nothing to send at the moment */
sent = unix_real_send (plugin,
plugin->unix_sock.desc,
msgw->timeout,
msgw->session->addr,
msgw->session->addrlen,
+ msgw->payload,
msgw->cont, msgw->cont_cls);
- if (sent == 0)
+ if (RETRY == sent)
{
- /* failed and retry */
- retry_counter++;
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX retry attempt",
- retry_counter, GNUNET_NO);
- return;
- }
+ GNUNET_STATISTICS_update (plugin->env->stats,"# UNIX retry attempts",
+ 1, GNUNET_NO);
- if (retry_counter > 0 )
- {
- /* no retry: reset counter */
- retry_counter = 0;
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX retry attempt",
- retry_counter, GNUNET_NO);
}
-
- if (sent == -1)
+ else if (GNUNET_SYSERR == sent)
{
/* failed and no retry */
+ if (NULL != msgw->cont)
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, msgw->payload, 0);
+
GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
plugin->bytes_in_queue -= msgw->msgsize;
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes in send queue",
+ GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers",
plugin->bytes_in_queue, GNUNET_NO);
plugin->bytes_discarded += msgw->msgsize;
GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded",
GNUNET_free (msgw);
return;
}
-
- if (sent > 0)
+ else if (sent > 0)
{
/* successfully sent bytes */
+ if (NULL != msgw->cont)
+ msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_OK, msgw->payload, msgw->msgsize);
+
GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize);
plugin->bytes_in_queue -= msgw->msgsize;
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes in send queue",
+ GNUNET_STATISTICS_set (plugin->env->stats,"# bytes currently in UNIX buffers",
plugin->bytes_in_queue, GNUNET_NO);
plugin->bytes_in_sent += msgw->msgsize;
- GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes sent",
+ GNUNET_STATISTICS_set (plugin->env->stats,"# bytes transmitted via UNIX",
plugin->bytes_in_sent, GNUNET_NO);
GNUNET_free (msgw->msg);
GNUNET_free (msgw);
return;
}
-
}
if (sockets_created == 0)
LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UNIX sockets\n"));
- plugin->session_map = GNUNET_CONTAINER_multihashmap_create(10);
+ plugin->session_map = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO);
GNUNET_SCHEDULER_add_now (address_notification, plugin);
return api;