From 3d6d182f86ba766bc31223853508449dde5dee38 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Fri, 27 Jan 2012 13:48:30 +0000 Subject: [PATCH] complete select write implementation --- src/transport/plugin_transport_unix.c | 270 ++++++++++++-------------- 1 file changed, 124 insertions(+), 146 deletions(-) diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 06918d39d..294ef5fd0 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c @@ -64,6 +64,8 @@ */ #define UNIX_NAT_DEFAULT_PORT 22086 +#define MAX_RETRIES 5 + GNUNET_NETWORK_STRUCT_BEGIN /** @@ -83,22 +85,26 @@ struct UNIXMessage }; -struct RetryList +struct UNIXMessageWrapper { - /** - * Pointer to next element. - */ - struct RetryList *next; + struct UNIXMessageWrapper *next; + struct UNIXMessageWrapper *prev; - /** - * Pointer to previous element. - */ - struct RetryList *prev; + struct UNIXMessage * msg; + size_t msgsize; - /** - * The actual retry context. - */ - struct RetrySendContext *retry_ctx; + int retry_counter; + + struct GNUNET_PeerIdentity target; + + struct GNUNET_TIME_Relative timeout; + unsigned int priority; + + void *addr; + size_t addrlen; + struct Session *session; + GNUNET_TRANSPORT_TransmitContinuation cont; + void *cont_cls; }; /** @@ -339,23 +345,15 @@ struct Plugin */ char *unix_socket_path; + struct UNIXMessageWrapper *msg_head; + struct UNIXMessageWrapper *msg_tail; + /** * ATS network */ struct GNUNET_ATS_Information ats_network; }; -/** - * Head of retry DLL. - */ -static struct RetryList *retry_list_head; - -/** - * Tail of retry DLL. - */ -static struct RetryList *retry_list_tail; - - /** * Disconnect from a remote node. Clean up session if we have one for this peer * @@ -383,21 +381,16 @@ static int unix_transport_server_stop (void *cls) { struct Plugin *plugin = cls; - struct RetryList *pos; - pos = retry_list_head; + struct UNIXMessageWrapper * msgw = plugin->msg_head; - while (NULL != (pos = retry_list_head)) + while (NULL != (msgw = plugin->msg_head)) { - GNUNET_CONTAINER_DLL_remove (retry_list_head, retry_list_tail, pos); - if (GNUNET_SCHEDULER_NO_TASK != pos->retry_ctx->retry_task) - { - GNUNET_SCHEDULER_cancel (pos->retry_ctx->retry_task); - } - GNUNET_free (pos->retry_ctx->msg); - GNUNET_free (pos->retry_ctx->addr); - GNUNET_free (pos->retry_ctx); - GNUNET_free (pos); + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); + if (msgw->cont != NULL) + msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); } if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) @@ -440,34 +433,6 @@ unix_real_send (void *cls, struct RetrySendContext *incoming_retry_context, size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls); -/** - * Retry sending a message. - * - * @param cls closure a struct RetrySendContext - * @param tc context information - */ -void -retry_send_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct RetrySendContext *retry_ctx = cls; - - if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) - { - GNUNET_free (retry_ctx->msg); - GNUNET_free (retry_ctx->addr); - GNUNET_free (retry_ctx); - return; - } - - unix_real_send (retry_ctx->plugin, retry_ctx, retry_ctx->send_handle, - &retry_ctx->target, retry_ctx->msg, retry_ctx->msg_size, - retry_ctx->priority, - GNUNET_TIME_absolute_get_remaining (retry_ctx->timeout), - retry_ctx->addr, retry_ctx->addrlen, retry_ctx->cont, - retry_ctx->cont_cls); - return; -} - /** * Actually send out the message, assume we've got the address and * send_handle squared away! @@ -499,16 +464,12 @@ unix_real_send (void *cls, struct RetrySendContext *incoming_retry_context, size_t addrlen, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) { - struct Plugin *plugin = cls; - struct UNIXMessage *message; - struct RetrySendContext *retry_ctx; - int ssize; + ssize_t sent; const void *sb; size_t sbs; struct sockaddr_un un; size_t slen; - struct RetryList *retry_list_entry; int retry; if (send_handle == NULL) @@ -533,16 +494,6 @@ unix_real_send (void *cls, struct RetrySendContext *incoming_retry_context, return 0; /* Can never send if we don't have an address!! */ } - /* Build the message to be sent */ - message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); - ssize = sizeof (struct UNIXMessage) + msgbuf_size; - - message->header.size = htons (ssize); - message->header.type = htons (0); - memcpy (&message->sender, plugin->env->my_identity, - sizeof (struct GNUNET_PeerIdentity)); - memcpy (&message[1], msgbuf, msgbuf_size); - memset (&un, 0, sizeof (un)); un.sun_family = AF_UNIX; slen = strlen (addr) + 1; @@ -562,8 +513,7 @@ unix_real_send (void *cls, struct RetrySendContext *incoming_retry_context, sb = (struct sockaddr *) &un; sbs = slen; retry = GNUNET_NO; - - sent = GNUNET_NETWORK_socket_sendto (send_handle, message, ssize, sb, sbs); + sent = GNUNET_NETWORK_socket_sendto (send_handle, msgbuf, msgbuf_size, sb, sbs); if ((GNUNET_SYSERR == sent) && ((errno == EAGAIN) || (errno == ENOBUFS))) retry = GNUNET_YES; @@ -577,14 +527,14 @@ unix_real_send (void *cls, struct RetrySendContext *incoming_retry_context, send_handle, SOL_SOCKET, SO_SNDBUF, &size, &len); - if (size < ssize) + if (size < msgbuf_size) { #if DEBUG_UNIX GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to increase socket buffer size from %i to %i for message size %i\n", size, ((ssize / 1000) + 2) * 1000, ssize); #endif - size = ((ssize / 1000) + 2) * 1000; + size = ((msgbuf_size / 1000) + 2) * 1000; if (GNUNET_NETWORK_socket_setsockopt ((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF, &size, sizeof (size)) == GNUNET_OK) @@ -594,71 +544,31 @@ unix_real_send (void *cls, struct RetrySendContext *incoming_retry_context, } } - if (retry == GNUNET_YES) - { - if (incoming_retry_context == NULL) - { - retry_list_entry = GNUNET_malloc (sizeof (struct RetryList)); - retry_ctx = GNUNET_malloc (sizeof (struct RetrySendContext)); - retry_ctx->addr = GNUNET_malloc (addrlen); - retry_ctx->msg = GNUNET_malloc (msgbuf_size); - retry_ctx->plugin = plugin; - memcpy (retry_ctx->addr, addr, addrlen); - memcpy (retry_ctx->msg, msgbuf, msgbuf_size); - retry_ctx->msg_size = msgbuf_size; - retry_ctx->addrlen = addrlen; - retry_ctx->send_handle = send_handle; - retry_ctx->cont = cont; - retry_ctx->cont_cls = cont_cls; - retry_ctx->priority = priority; - retry_ctx->timeout = GNUNET_TIME_relative_to_absolute (timeout); - memcpy (&retry_ctx->target, target, sizeof (struct GNUNET_PeerIdentity)); - retry_ctx->delay = GNUNET_TIME_UNIT_MILLISECONDS; - retry_ctx->retry_list_entry = retry_list_entry; - retry_list_entry->retry_ctx = retry_ctx; - GNUNET_CONTAINER_DLL_insert (retry_list_head, retry_list_tail, - retry_list_entry); - } - else - { - retry_ctx = incoming_retry_context; - retry_ctx->delay = GNUNET_TIME_relative_multiply (retry_ctx->delay, 2); - } - retry_ctx->retry_task = - GNUNET_SCHEDULER_add_delayed (retry_ctx->delay, &retry_send_message, - retry_ctx); - - //GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG, "send"); - GNUNET_free (message); - return ssize; - } #if DEBUG_UNIX - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "UNIX transmit %u-byte message to %s (%d: %s)\n", - (unsigned int) ssize, GNUNET_a2s (sb, sbs), (int) sent, + (unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent, (sent < 0) ? STRERROR (errno) : "ok"); #endif + /* Calling continuation */ if (cont != NULL) { - if (sent == GNUNET_SYSERR) + if ((sent == GNUNET_SYSERR) && (retry == GNUNET_NO)) cont (cont_cls, target, GNUNET_SYSERR); - else - { + if (sent > 0) cont (cont_cls, target, GNUNET_OK); - } } - if (incoming_retry_context != NULL) - { - GNUNET_CONTAINER_DLL_remove (retry_list_head, retry_list_tail, - incoming_retry_context->retry_list_entry); - GNUNET_free (incoming_retry_context->retry_list_entry); - GNUNET_free (incoming_retry_context->msg); - GNUNET_free (incoming_retry_context->addr); - GNUNET_free (incoming_retry_context); - } + /* return number of bytes successfully sent */ + if (sent > 0) + return sent; + /* failed and retry: return 0 */ + if ((GNUNET_SYSERR == sent) && (retry == GNUNET_YES)) + return 0; + /* failed and no retry: return -1 */ + if ((GNUNET_SYSERR == sent) && (retry == GNUNET_NO)) + return -1; - GNUNET_free (message); return sent; } @@ -756,25 +666,47 @@ unix_plugin_send_old (void *cls, const struct GNUNET_PeerIdentity *target, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) { struct Plugin *plugin = cls; - ssize_t sent; + struct UNIXMessage *message; + struct UNIXMessageWrapper *wrapper; + int ssize; GNUNET_assert (NULL == session); + /* Build the message to be sent */ + wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper) + addrlen); + message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); + ssize = sizeof (struct UNIXMessage) + msgbuf_size; + #if DEBUG_UNIX GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to send message to `%s'\n", (char *) addr); #endif - sent = - unix_real_send (cls, NULL, plugin->unix_sock.desc, target, msgbuf, - msgbuf_size, priority, timeout, addr, addrlen, cont, - cont_cls); + + message->header.size = htons (ssize); + message->header.type = htons (0); + memcpy (&message->sender, plugin->env->my_identity, + sizeof (struct GNUNET_PeerIdentity)); + memcpy (&message[1], msgbuf, msgbuf_size); + + wrapper->msg = message; + wrapper->msgsize = ssize; + wrapper->priority = priority; + wrapper->timeout = timeout; + wrapper->cont = cont; + wrapper->cont_cls = cont_cls; + wrapper->addr = &wrapper[1]; + wrapper->addrlen = addrlen; + wrapper->retry_counter = 0; + memcpy (&wrapper->target, target, sizeof (struct GNUNET_PeerIdentity)); + memcpy (&wrapper[1], addr, addrlen); + + GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper); + #if DEBUG_UNIX GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", sent, (char *) addr); #endif - if (sent == GNUNET_SYSERR) - return 0; - return sent; + return ssize; } @@ -880,7 +812,52 @@ unix_plugin_select_read (struct Plugin * plugin) static void unix_plugin_select_write (struct Plugin * plugin) { + int sent = 0; + struct UNIXMessageWrapper * msgw = plugin->msg_head; + + sent = unix_real_send (plugin, NULL, + plugin->unix_sock.desc, + &msgw->target, + (const char *) msgw->msg, + msgw->msgsize, + msgw->priority, + msgw->timeout, + msgw->addr, + msgw->addrlen, + msgw->cont, msgw->cont_cls); + + /* successfully sent bytes */ + if (sent > 0) + { + GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw); + GNUNET_free (msgw); + return; + } + + /* max retries */ + if (msgw->retry_counter > MAX_RETRIES) + { + msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR); + GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw); + GNUNET_break (0); + GNUNET_free (msgw); + return; + } + + /* failed and no retry */ + if (sent == -1) + { + GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw); + GNUNET_free (msgw); + return; + } + /* failed and retry */ + if (sent == 0) + { + msgw->retry_counter++; + return; + } } @@ -907,7 +884,8 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->unix_sock.desc)); - unix_plugin_select_write (plugin); + if (plugin->msg_head != NULL) + unix_plugin_select_write (plugin); } if ((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0) -- 2.25.1