From 843f898e3d1391aba7003e6e7c9ec0d7b3530fac Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 5 May 2013 12:01:06 +0000 Subject: [PATCH] -hopefully fixing #2869 --- src/transport/plugin_transport_unix.c | 837 +++++++++++++++----------- 1 file changed, 469 insertions(+), 368 deletions(-) diff --git a/src/transport/plugin_transport_unix.c b/src/transport/plugin_transport_unix.c index 646695f79..81aa9cb94 100644 --- a/src/transport/plugin_transport_unix.c +++ b/src/transport/plugin_transport_unix.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2010 Christian Grothoff (and other contributing authors) + (C) 2010, 2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -26,29 +26,22 @@ * @author Christian Grothoff * @author Nathan Evans */ - #include "platform.h" #include "gnunet_hello_lib.h" -#include "gnunet_connection_lib.h" -#include "gnunet_container_lib.h" -#include "gnunet_os_lib.h" -#include "gnunet_peerinfo_service.h" +#include "gnunet_util_lib.h" #include "gnunet_protocols.h" -#include "gnunet_resolver_service.h" -#include "gnunet_server_lib.h" -#include "gnunet_signatures.h" #include "gnunet_statistics_service.h" #include "gnunet_transport_service.h" #include "gnunet_transport_plugin.h" #include "transport.h" -#define MAX_PROBES 20 -#define MAX_RETRIES 3 -#define RETRY 0 - -#define LOG(kind,...) GNUNET_log_from (kind, "transport-unix",__VA_ARGS__) -#define DEFAULT_NAT_PORT 0 +/** + * Return code we give on 'send' if we failed to send right now + * but it makes sense to retry later. (Note: we might want to + * move this to the plugin API!?). + */ +#define RETRY 0 /** * How long until we give up on transmitting the welcome message? @@ -56,10 +49,15 @@ #define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) /** - * Starting port for listening and sending, eventually a config value + * Default "port" to use, if configuration does not specify. + * Essentially just a number appended to the UNIX path. */ #define UNIX_NAT_DEFAULT_PORT 22086 + +#define LOG(kind,...) GNUNET_log_from (kind, "transport-unix",__VA_ARGS__) + + GNUNET_NETWORK_STRUCT_BEGIN /** @@ -79,44 +77,89 @@ struct UNIXMessage }; + +/** + * Handle for a session. + */ struct Session { struct GNUNET_PeerIdentity target; + struct Plugin * plugin; + void *addr; + size_t addrlen; /** * Session timeout task */ GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - struct Plugin * plugin; }; + struct UNIXMessageWrapper { + /** + * We keep messages in a doubly linked list. + */ struct UNIXMessageWrapper *next; + + /** + * We keep messages in a doubly linked list. + */ struct UNIXMessageWrapper *prev; + /** + * The actual payload (allocated separately right now). + */ struct UNIXMessage * msg; - size_t msgsize; - size_t payload; - - struct GNUNET_TIME_Absolute timeout; - unsigned int priority; + /** + * Session this message belongs to. + */ struct Session *session; + + /** + * Function to call upon transmission. + */ GNUNET_TRANSPORT_TransmitContinuation cont; + + /** + * Closure for 'cont'. + */ void *cont_cls; + + /** + * Timeout for this message. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Number of bytes in 'msg'. + */ + size_t msgsize; + + /** + * Number of bytes of payload encapsulated in 'msg'. + */ + size_t payload; + + /** + * Priority of the message (ignored, just dragged along in UNIX). + */ + unsigned int priority; }; -/* Forward definition */ + +/** + * Encapsulation of all of the state of the plugin. + */ struct Plugin; /** - * UNIX NAT "Session" + * UNIX "Session" */ struct PeerSession { @@ -166,6 +209,7 @@ struct PeerSession }; + /** * Information we keep for each of our listen sockets. */ @@ -177,7 +221,9 @@ struct UNIX_Sock_Info struct GNUNET_NETWORK_Handle *desc; /** - * The port we bound to + * The port we bound to (not an actual PORT, as UNIX domain sockets + * don't have ports, but rather a number in the path name to make this + * one unique). */ uint16_t port; }; @@ -188,30 +234,31 @@ struct UNIX_Sock_Info */ struct Plugin { + /** - * Our environment. + * ID of task used to update our addresses when one expires. */ - struct GNUNET_TRANSPORT_PluginEnvironment *env; + GNUNET_SCHEDULER_TaskIdentifier address_update_task; /** - * Sessions + * ID of select task */ - struct GNUNET_CONTAINER_MultiHashMap *session_map; + GNUNET_SCHEDULER_TaskIdentifier select_task; /** - * ID of task used to update our addresses when one expires. + * Number of bytes we currently have in our write queue. */ - GNUNET_SCHEDULER_TaskIdentifier address_update_task; + unsigned long long bytes_in_queue; /** - * ID of select task + * Our environment. */ - GNUNET_SCHEDULER_TaskIdentifier select_task; + struct GNUNET_TRANSPORT_PluginEnvironment *env; /** - * Integer to append to unix domain socket. + * Sessions */ - uint16_t port; + struct GNUNET_CONTAINER_MultiHashMap *session_map; /** * FD Read set @@ -223,65 +270,81 @@ struct Plugin */ struct GNUNET_NETWORK_FDSet *ws; - int with_ws; - - /** - * socket that we transmit all data with - */ - struct UNIX_Sock_Info unix_sock; - /** * Path of our unix domain socket (/tmp/unix-plugin-PORT) */ char *unix_socket_path; + /** + * Head of queue of messages to transmit. + */ struct UNIXMessageWrapper *msg_head; + + /** + * Tail of queue of messages to transmit. + */ struct UNIXMessageWrapper *msg_tail; + /** + * socket that we transmit all data with + */ + struct UNIX_Sock_Info unix_sock; + /** * ATS network */ struct GNUNET_ATS_Information ats_network; - unsigned int bytes_in_queue; - unsigned int bytes_in_sent; - unsigned int bytes_in_recv; - unsigned int bytes_discarded; + /** + * Is the write set in the current 'select' task? GNUNET_NO if the + * write queue was empty when the main task was scheduled, + * GNUNET_YES if we're already waiting for being allowed to write. + */ + int with_ws; + + /** + * Integer to append to unix domain socket. + */ + uint16_t port; + }; -/** - * Start session timeout - */ -static void -start_session_timeout (struct Session *s); /** * Increment session timeout due to activity + * + * @param s session for which the timeout should be moved */ static void reschedule_session_timeout (struct Session *s); + /** - * Cancel timeout + * We have been notified that our writeset has something to read. We don't + * know which socket needs to be read, so we have to check each one + * Then reschedule this function to be called again once more is available. + * + * @param cls the plugin handle + * @param tc the scheduling context (for rescheduling this function again) */ -static void -stop_session_timeout (struct Session *s); - - static void unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/** + * Re-schedule the main 'select' callback (unix_plugin_select) + * for this plugin. + * + * @param plugin the plugin context + */ static void reschedule_select (struct Plugin * plugin) { - if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (plugin->select_task); plugin->select_task = GNUNET_SCHEDULER_NO_TASK; } - if (NULL != plugin->msg_head) { plugin->select_task = @@ -304,15 +367,36 @@ reschedule_select (struct Plugin * plugin) } } + +/** + * Closure to 'lookup_session_it'. + */ struct LookupCtx { + /** + * Location to store the session, if found. + */ struct Session *s; + + /** + * Address we are looking for. + */ const struct sockaddr_un *addr; }; -int lookup_session_it (void *cls, - const struct GNUNET_HashCode * key, - void *value) + +/** + * Function called to find a session by address. + * + * @param cls the 'struct LookupCtx' + * @param key peer we are looking for (unused) + * @param value a session + * @return GNUNET_YES if not found (continue looking), GNUNET_NO on success + */ +static int +lookup_session_it (void *cls, + const struct GNUNET_HashCode * key, + void *value) { struct LookupCtx *lctx = cls; struct Session *t = value; @@ -326,23 +410,33 @@ int lookup_session_it (void *cls, } +/** + * Find an existing session by address. + * + * @param plugin the plugin + * @param sender for which peer should the session be? + * @param addr address to look for + * @return NULL if session was not found + */ static struct Session * -lookup_session (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, const struct sockaddr_un *addr) +lookup_session (struct Plugin *plugin, + const struct GNUNET_PeerIdentity *sender, + const struct sockaddr_un *addr) { struct LookupCtx lctx; GNUNET_assert (NULL != plugin); GNUNET_assert (NULL != sender); GNUNET_assert (NULL != addr); - lctx.s = NULL; lctx.addr = addr; - - GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &sender->hashPubKey, &lookup_session_it, &lctx); - + GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, + &sender->hashPubKey, + &lookup_session_it, &lctx); return lctx.s; } + /** * Functions with this signature are called whenever we need * to close a session due to a disconnect or failure to @@ -353,18 +447,16 @@ lookup_session (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, const static void disconnect_session (struct Session *s) { + struct Plugin *plugin = s->plugin; struct UNIXMessageWrapper *msgw; struct UNIXMessageWrapper *next; - struct Plugin * plugin = s->plugin; int removed; - GNUNET_assert (plugin != NULL); - GNUNET_assert (s != NULL); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr); - stop_session_timeout (s); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting session for peer `%s' `%s'\n", + GNUNET_i2s (&s->target), + s->addr); plugin->env->session_end (plugin->env->cls, &s->target, s); - - msgw = plugin->msg_head; removed = GNUNET_NO; next = plugin->msg_head; while (NULL != next) @@ -381,85 +473,22 @@ disconnect_session (struct Session *s) GNUNET_free (msgw); removed = GNUNET_YES; } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (plugin->session_map, + &s->target.hashPubKey, + s)); + GNUNET_STATISTICS_set (plugin->env->stats, + "# UNIX sessions active", + GNUNET_CONTAINER_multihashmap_size (plugin->session_map), + GNUNET_NO); if ((GNUNET_YES == removed) && (NULL == plugin->msg_head)) reschedule_select (plugin); - - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s)); - - GNUNET_STATISTICS_set(plugin->env->stats, - "# UNIX sessions active", - GNUNET_CONTAINER_multihashmap_size(plugin->session_map), - GNUNET_NO); - - GNUNET_free (s); -} - -static int -get_session_delete_it (void *cls, const struct GNUNET_HashCode * key, void *value) -{ - struct Session *s = value; - disconnect_session (s); - return GNUNET_YES; -} - - -/** - * Disconnect from a remote node. Clean up session if we have one for this peer - * - * @param cls closure for this call (should be handle to Plugin) - * @param target the peeridentity of the peer to disconnect - * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed - */ -static void -unix_disconnect (void *cls, const struct GNUNET_PeerIdentity *target) -{ - struct Plugin *plugin = cls; - GNUNET_assert (plugin != NULL); - - GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_delete_it, plugin); - return; -} - -/** - * Shutdown the server process (stop receiving inbound traffic). Maybe - * restarted later! - * - * @param cls Handle to the plugin for this transport - * - * @return returns the number of sockets successfully closed, - * should equal the number of sockets successfully opened - */ -static int -unix_transport_server_stop (void *cls) -{ - struct Plugin *plugin = cls; - struct UNIXMessageWrapper * msgw; - - while (NULL != (msgw = plugin->msg_head)) - { - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); - if (msgw->cont != NULL) - msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, - msgw->payload, 0); - GNUNET_free (msgw->msg); - GNUNET_free (msgw); - } - - if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (plugin->select_task); - plugin->select_task = GNUNET_SCHEDULER_NO_TASK; - } - - if (NULL != plugin->unix_sock.desc) + if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) { - GNUNET_break (GNUNET_OK == - GNUNET_NETWORK_socket_close (plugin->unix_sock.desc)); - plugin->unix_sock.desc = NULL; - plugin->with_ws = GNUNET_NO; + GNUNET_SCHEDULER_cancel (s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; } - return GNUNET_OK; + GNUNET_free (s); } @@ -482,8 +511,7 @@ unix_transport_server_stop (void *cls) * for the next transmission call; or if the * peer disconnected...) * @param cont_cls closure for cont - * - * @return on success : the number of bytes written, 0 n retry, -1 on errors + * @return on success the number of bytes written, RETRY for retry, -1 on errors */ static ssize_t unix_real_send (void *cls, @@ -505,13 +533,12 @@ unix_real_send (void *cls, size_t slen; GNUNET_assert (NULL != plugin); - - if (send_handle == NULL) + if (NULL == send_handle) { GNUNET_break (0); /* We do not have a send handle */ return GNUNET_SYSERR; } - if ((addr == NULL) || (addrlen == 0)) + if ((NULL == addr) || (0 == addrlen)) { GNUNET_break (0); /* Can never send if we don't have an address */ return GNUNET_SYSERR; @@ -543,15 +570,10 @@ resend: if (GNUNET_SYSERR == sent) { - if (errno == EAGAIN) - { - return RETRY; /* We have to retry later */ - } - if (errno == ENOBUFS) - { + if ( (EAGAIN == errno) || + (ENOBUFS == errno) ) return RETRY; /* We have to retry later */ - } - if (errno == EMSGSIZE) + if (EMSGSIZE == errno) { socklen_t size = 0; socklen_t len = sizeof (size); @@ -587,29 +609,55 @@ resend: } LOG (GNUNET_ERROR_TYPE_DEBUG, - "UNIX transmit %u-byte message to %s (%d: %s)\n", - (unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent, - (sent < 0) ? STRERROR (errno) : "ok"); + "UNIX transmit %u-byte message to %s (%d: %s)\n", + (unsigned int) msgbuf_size, + GNUNET_a2s (sb, sbs), + (int) sent, + (sent < 0) ? STRERROR (errno) : "ok"); return sent; } -struct gsi_ctx -{ - char *address; - size_t addrlen; + +/** + * Closure for 'get_session_it'. + */ +struct GetSessionIteratorContext +{ + /** + * Location to store the session, if found. + */ struct Session *res; + + /** + * Address information. + */ + const char *address; + + /** + * Number of bytes in 'address' + */ + size_t addrlen; }; +/** + * Function called to find a session by address. + * + * @param cls the 'struct LookupCtx' + * @param key peer we are looking for (unused) + * @param value a session + * @return GNUNET_YES if not found (continue looking), GNUNET_NO on success + */ static int -get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value) +get_session_it (void *cls, + const struct GNUNET_HashCode *key, + void *value) { - struct gsi_ctx *gsi = cls; + struct GetSessionIteratorContext *gsi = cls; struct Session *s = value; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Comparing session %s %s\n", gsi->address, s->addr); - if ((gsi->addrlen == s->addrlen) && - (0 == memcmp (gsi->address, s->addr, s->addrlen))) + if ( (gsi->addrlen == s->addrlen) && + (0 == memcmp (gsi->address, s->addr, s->addrlen)) ) { gsi->res = s; return GNUNET_NO; @@ -617,6 +665,29 @@ get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value) return GNUNET_YES; } + +/** + * Session was idle for too long, so disconnect it + * + * @param cls the 'struct Session' to disconnect + * @param tc scheduler context + */ +static void +session_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Session *s = cls; + + s->timeout_task = GNUNET_SCHEDULER_NO_TASK; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Session %p was idle for %s, disconnecting\n", + s, + GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + GNUNET_YES)); + disconnect_session (s); +} + + /** * Creates a new outbound session the transport service will use to send data to the * peer @@ -627,60 +698,56 @@ get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value) */ static struct Session * unix_plugin_get_session (void *cls, - const struct GNUNET_HELLO_Address *address) + const struct GNUNET_HELLO_Address *address) { - struct Session * s = NULL; struct Plugin *plugin = cls; - struct gsi_ctx gsi; + struct Session *s; + struct GetSessionIteratorContext gsi; - /* Checks */ - GNUNET_assert (plugin != NULL); - GNUNET_assert (address != NULL); + GNUNET_assert (NULL != plugin); + GNUNET_assert (NULL != address); /* Check if already existing */ - gsi.address = (char *) address->address; + gsi.address = (const char *) address->address; gsi.addrlen = address->address_length; gsi.res = NULL; - GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &address->peer.hashPubKey, &get_session_it, &gsi); - if (gsi.res != NULL) + GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, + &address->peer.hashPubKey, + &get_session_it, &gsi); + if (NULL != gsi.res) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Found existing session\n"); return gsi.res; } - /* Create a new session */ + /* create a new session */ s = GNUNET_malloc (sizeof (struct Session) + address->address_length); s->addr = &s[1]; s->addrlen = address->address_length; s->plugin = plugin; - memcpy(s->addr, address->address, s->addrlen); - memcpy(&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity)); + memcpy (s->addr, address->address, s->addrlen); + memcpy (&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity)); - start_session_timeout (s); - - GNUNET_CONTAINER_multihashmap_put (plugin->session_map, - &address->peer.hashPubKey, s, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - GNUNET_STATISTICS_set(plugin->env->stats, - "# UNIX sessions active", - GNUNET_CONTAINER_multihashmap_size(plugin->session_map), - GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating new session\n"); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + &session_timeout, + s); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating a new session %p with timeout set to %s\n", + s, + GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + GNUNET_YES)); + (void) GNUNET_CONTAINER_multihashmap_put (plugin->session_map, + &address->peer.hashPubKey, s, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_STATISTICS_set (plugin->env->stats, + "# UNIX sessions active", + GNUNET_CONTAINER_multihashmap_size (plugin->session_map), + GNUNET_NO); return s; } -/* - * @param cls the plugin handle - * @param tc the scheduling context (for rescheduling this function again) - * - * We have been notified that our writeset has something to read. We don't - * know which socket needs to be read, so we have to check each one - * Then reschedule this function to be called again once more is available. - * - */ -static void -unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); /** * Function that can be used by the transport service to transmit @@ -722,24 +789,26 @@ unix_plugin_send (void *cls, struct UNIXMessage *message; int ssize; - GNUNET_assert (plugin != NULL); - GNUNET_assert (session != NULL); + GNUNET_assert (NULL != plugin); + GNUNET_assert (NULL != session); - if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_contains_value(plugin->session_map, - &session->target.hashPubKey, session)) + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_contains_value (plugin->session_map, + &session->target.hashPubKey, + session)) { - LOG (GNUNET_ERROR_TYPE_ERROR, "Invalid session for peer `%s' `%s'\n", - GNUNET_i2s (&session->target), - (char *) session->addr); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Invalid session for peer `%s' `%s'\n", + GNUNET_i2s (&session->target), + (const char *) session->addr); GNUNET_break (0); - return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending %u bytes with session for peer `%s' `%s'\n", - msgbuf_size, - GNUNET_i2s (&session->target), - (char *) session->addr); - + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending %u bytes with session for peer `%s' `%s'\n", + msgbuf_size, + GNUNET_i2s (&session->target), + (const char *) session->addr); ssize = sizeof (struct UNIXMessage) + msgbuf_size; message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size); message->header.size = htons (ssize); @@ -747,9 +816,7 @@ unix_plugin_send (void *cls, memcpy (&message->sender, plugin->env->my_identity, sizeof (struct GNUNET_PeerIdentity)); memcpy (&message[1], msgbuf, msgbuf_size); - reschedule_session_timeout (session); - wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper)); wrapper->msg = message; wrapper->msgsize = ssize; @@ -759,20 +826,16 @@ unix_plugin_send (void *cls, wrapper->cont = cont; wrapper->cont_cls = cont_cls; wrapper->session = session; - - GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper); - + GNUNET_CONTAINER_DLL_insert (plugin->msg_head, + plugin->msg_tail, + wrapper); plugin->bytes_in_queue += ssize; - GNUNET_STATISTICS_set (plugin->env->stats,"# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", ssize, - (char *) session->addr); - if (plugin->with_ws == GNUNET_NO) - { + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, + GNUNET_NO); + if (GNUNET_NO == plugin->with_ws) reschedule_select (plugin); - } - return ssize; } @@ -798,16 +861,18 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, GNUNET_assert (fromlen >= sizeof (struct sockaddr_un)); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from %s\n", - un->sun_path); - - - - plugin->bytes_in_recv += ntohs(currhdr->size); - GNUNET_STATISTICS_set (plugin->env->stats,"# bytes received via UNIX", - plugin->bytes_in_recv, GNUNET_NO); - - addr = GNUNET_HELLO_address_allocate(sender, "unix", un->sun_path, strlen (un->sun_path) + 1); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message from %s\n", + un->sun_path); + GNUNET_STATISTICS_update (plugin->env->stats, + "# bytes received via UNIX", + ntohs (currhdr->size), + GNUNET_NO); + + addr = GNUNET_HELLO_address_allocate (sender, + "unix", + un->sun_path, + strlen (un->sun_path) + 1); s = lookup_session (plugin, sender, un); if (NULL == s) s = unix_plugin_get_session (plugin, addr); @@ -827,8 +892,13 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, } +/** + * Read from UNIX domain socket (it is ready). + * + * @param plugin the plugin + */ static void -unix_plugin_select_read (struct Plugin * plugin) +unix_plugin_select_read (struct Plugin *plugin) { char buf[65536] GNUNET_ALIGN; struct UNIXMessage *msg; @@ -889,44 +959,48 @@ unix_plugin_select_read (struct Plugin * plugin) GNUNET_break_op (0); break; } - unix_demultiplexer (plugin, &sender, currhdr, &un, sizeof (un)); offset += csize; } } +/** + * Write to UNIX domain socket (it is ready). + * + * @param plugin the plugin + */ static void -unix_plugin_select_write (struct Plugin * plugin) +unix_plugin_select_write (struct Plugin *plugin) { int sent = 0; + struct UNIXMessageWrapper * msgw; - struct UNIXMessageWrapper * msgw = plugin->msg_tail; - while (NULL != msgw) + while (NULL != (msgw = plugin->msg_tail)) { - if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value > 0) - break; /* Message is ready for sending */ - else - { - /* Message has a timeout */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout for message with %llu bytes \n", msgw->msgsize); - GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, msgw->payload, 0); - - plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); - - plugin->bytes_discarded += msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded", - plugin->bytes_discarded, GNUNET_NO); - - GNUNET_free (msgw->msg); - GNUNET_free (msgw); - } - msgw = plugin->msg_tail; + if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value > 0) + break; /* Message is ready for sending */ + /* Message has a timeout */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Timeout for message with %u bytes \n", + (unsigned int) msgw->msgsize); + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); + plugin->bytes_in_queue -= msgw->msgsize; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UNIX bytes discarded", + msgw->msgsize, + GNUNET_NO); + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, + &msgw->session->target, + GNUNET_SYSERR, + msgw->payload, + 0); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); } if (NULL == msgw) return; /* Nothing to send at the moment */ @@ -945,11 +1019,12 @@ unix_plugin_select_write (struct Plugin * plugin) if (RETRY == sent) { - GNUNET_STATISTICS_update (plugin->env->stats,"# UNIX retry attempts", - 1, GNUNET_NO); - + GNUNET_STATISTICS_update (plugin->env->stats, + "# UNIX retry attempts", + 1, GNUNET_NO); + return; } - else if (GNUNET_SYSERR == sent) + if (GNUNET_SYSERR == sent) { /* failed and no retry */ if (NULL != msgw->cont) @@ -959,36 +1034,40 @@ unix_plugin_select_write (struct Plugin * plugin) GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats, "# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); - plugin->bytes_discarded += msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded", - plugin->bytes_discarded, GNUNET_NO); - - GNUNET_free (msgw->msg); - GNUNET_free (msgw); - return; - } - else if (sent > 0) - { - /* successfully sent bytes */ - if (NULL != msgw->cont) - msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_OK, msgw->payload, msgw->msgsize); - - GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw); - - GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); - plugin->bytes_in_queue -= msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats,"# bytes currently in UNIX buffers", - plugin->bytes_in_queue, GNUNET_NO); - plugin->bytes_in_sent += msgw->msgsize; - GNUNET_STATISTICS_set (plugin->env->stats,"# bytes transmitted via UNIX", - plugin->bytes_in_sent, GNUNET_NO); + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# UNIX bytes discarded", + msgw->msgsize, + GNUNET_NO); GNUNET_free (msgw->msg); GNUNET_free (msgw); return; } + /* successfully sent bytes */ + GNUNET_break (sent > 0); + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, + plugin->msg_tail, + msgw); + GNUNET_assert (plugin->bytes_in_queue >= msgw->msgsize); + plugin->bytes_in_queue -= msgw->msgsize; + GNUNET_STATISTICS_set (plugin->env->stats, + "# bytes currently in UNIX buffers", + plugin->bytes_in_queue, + GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# bytes transmitted via UNIX", + msgw->msgsize, + GNUNET_NO); + if (NULL != msgw->cont) + msgw->cont (msgw->cont_cls, &msgw->session->target, + GNUNET_OK, + msgw->payload, + msgw->msgsize); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); } @@ -1001,7 +1080,8 @@ unix_plugin_select_write (struct Plugin * plugin) * @param tc the scheduling context (for rescheduling this function again) */ static void -unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +unix_plugin_select (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Plugin *plugin = cls; @@ -1014,7 +1094,7 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* Ready to send data */ GNUNET_assert (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->unix_sock.desc)); - if (plugin->msg_head != NULL) + if (NULL != plugin->msg_head) unix_plugin_select_write (plugin); } @@ -1025,7 +1105,6 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) (tc->read_ready, plugin->unix_sock.desc)); unix_plugin_select_read (plugin); } - reschedule_select (plugin); } @@ -1221,10 +1300,12 @@ unix_address_to_string (void *cls, const void *addr, size_t addrlen) * @param tc unused */ static void -address_notification (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +address_notification (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Plugin *plugin = cls; + plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK; plugin->env->notify_address (plugin->env->cls, GNUNET_YES, plugin->unix_socket_path, strlen (plugin->unix_socket_path) + 1, @@ -1233,80 +1314,71 @@ address_notification (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** - * Session was idle, so disconnect it - */ -static void -session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - GNUNET_assert (NULL != cls); - struct Session *s = cls; - - s->timeout_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Session %p was idle for %llu ms, disconnecting\n", - s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); - /* call session destroy function */ - disconnect_session(s); -} - - -/** - * Start session timeout + * Increment session timeout due to activity + * + * @param s session for which the timeout should be moved */ static void -start_session_timeout (struct Session *s) +reschedule_session_timeout (struct Session *s) { GNUNET_assert (NULL != s); - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + GNUNET_SCHEDULER_cancel (s->timeout_task); s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &session_timeout, s); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout for session %p set to %llu ms\n", - s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + "Timeout rescheduled for session %p set to %s\n", + s, + GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + GNUNET_YES)); } /** - * Increment session timeout due to activity + * Function called on sessions to disconnect + * + * @param cls the plugin (unused) + * @param key peer identity (unused) + * @param value the 'struct Session' to disconnect + * @return GNUNET_YES (always, continue to iterate) */ -static void -reschedule_session_timeout (struct Session *s) +static int +get_session_delete_it (void *cls, const struct GNUNET_HashCode * key, void *value) { - GNUNET_assert (NULL != s); - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task); + struct Session *s = value; - GNUNET_SCHEDULER_cancel (s->timeout_task); - s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &session_timeout, - s); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout rescheduled for session %p set to %llu ms\n", - s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); + disconnect_session (s); + return GNUNET_YES; } /** - * Cancel timeout + * Disconnect from a remote node. Clean up session if we have one for this peer + * + * @param cls closure for this call (should be handle to Plugin) + * @param target the peeridentity of the peer to disconnect + * @return GNUNET_OK on success, GNUNET_SYSERR if the operation failed */ static void -stop_session_timeout (struct Session *s) +unix_disconnect (void *cls, + const struct GNUNET_PeerIdentity *target) { - GNUNET_assert (NULL != s); + struct Plugin *plugin = cls; - if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task) - { - GNUNET_SCHEDULER_cancel (s->timeout_task); - s->timeout_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Timeout stopped for session %p canceled\n", - s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value); - } + GNUNET_assert (plugin != NULL); + GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, + &target->hashPubKey, + &get_session_delete_it, plugin); } + /** - * The exported method. Makes the core api available via a global and - * returns the unix transport API. + * The exported method. Initializes the plugin and returns a + * struct with the callbacks. + * + * @param cls the plugin's execution environment + * @return NULL on error, plugin functions otherwise */ void * libgnunet_plugin_transport_unix_init (void *cls) @@ -1335,7 +1407,8 @@ libgnunet_plugin_transport_unix_init (void *cls) plugin = GNUNET_malloc (sizeof (struct Plugin)); plugin->port = port; plugin->env = env; - GNUNET_asprintf (&plugin->unix_socket_path, "/tmp/unix-plugin-sock.%d", + GNUNET_asprintf (&plugin->unix_socket_path, + "/tmp/unix-plugin-sock.%d", plugin->port); api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions)); @@ -1349,39 +1422,67 @@ libgnunet_plugin_transport_unix_init (void *cls) api->check_address = &unix_check_address; api->string_to_address = &unix_string_to_address; sockets_created = unix_transport_server_start (plugin); - if (sockets_created == 0) - LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UNIX sockets\n")); - + if (0 == sockets_created) + LOG (GNUNET_ERROR_TYPE_WARNING, + _("Failed to open UNIX listen socket\n")); plugin->session_map = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO); - - GNUNET_SCHEDULER_add_now (address_notification, plugin); + plugin->address_update_task = GNUNET_SCHEDULER_add_now (&address_notification, plugin); return api; } + +/** + * Shutdown the plugin. + * + * @param cls the plugin API returned from the initialization function + * @return NULL (always) + */ void * libgnunet_plugin_transport_unix_done (void *cls) { struct GNUNET_TRANSPORT_PluginFunctions *api = cls; struct Plugin *plugin = api->cls; + struct UNIXMessageWrapper * msgw; if (NULL == plugin) { GNUNET_free (api); return NULL; } - plugin->env->notify_address (plugin->env->cls, GNUNET_NO, plugin->unix_socket_path, strlen (plugin->unix_socket_path) + 1, "unix"); + while (NULL != (msgw = plugin->msg_head)) + { + GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw); + if (msgw->cont != NULL) + msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR, + msgw->payload, 0); + GNUNET_free (msgw->msg); + GNUNET_free (msgw); + } - unix_transport_server_stop (plugin); - - - GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map, &get_session_delete_it, plugin); + if (GNUNET_SCHEDULER_NO_TASK != plugin->select_task) + { + GNUNET_SCHEDULER_cancel (plugin->select_task); + plugin->select_task = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != plugin->address_update_task) + { + GNUNET_SCHEDULER_cancel (plugin->address_update_task); + plugin->address_update_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != plugin->unix_sock.desc) + { + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (plugin->unix_sock.desc)); + plugin->unix_sock.desc = NULL; + plugin->with_ws = GNUNET_NO; + } + GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map, + &get_session_delete_it, plugin); GNUNET_CONTAINER_multihashmap_destroy (plugin->session_map); - - if (NULL != plugin->rs) GNUNET_NETWORK_fdset_destroy (plugin->rs); if (NULL != plugin->ws) -- 2.25.1