Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*/
-
/**
* @file transport/plugin_transport_tcp.c
* @brief Implementation of the TCP transport service
* @author Christian Grothoff
*/
-
#include "platform.h"
#include "gnunet_hello_lib.h"
#include "gnunet_connection_lib.h"
*/
int expecting_welcome;
+ /**
+ * Was this a connection that was inbound (we accepted)? (GNUNET_YES/GNUNET_NO)
+ */
+ int inbound;
+
};
do_transmit (void *cls, size_t size, void *buf)
{
struct Session *session = cls;
- struct PendingMessage *pm;
+ struct GNUNET_PeerIdentity pid;
+ struct Plugin *plugin;
+ struct PendingMessage *pos;
+ struct PendingMessage *hd;
+ struct PendingMessage *tl;
+ struct GNUNET_TIME_Absolute now;
char *cbuf;
-
size_t ret;
session->transmit_handle = NULL;
+ plugin = session->plugin;
if (buf == NULL)
{
#if DEBUG_TCP
"Timeout trying to transmit to peer `%4s', discarding message queue.\n",
GNUNET_i2s (&session->target));
#endif
- /* timeout */
- while (NULL != (pm = session->pending_messages_head))
- {
+ /* timeout; cancel all messages that have already expired */
+ hd = NULL;
+ tl = NULL;
+ ret = 0;
+ now = GNUNET_TIME_absolute_get ();
+ while ( (NULL != (pos = session->pending_messages_head)) &&
+ (pos->timeout.value <= now.value) )
+ {
GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
session->pending_messages_tail,
- pm);
+ pos);
#if DEBUG_TCP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"tcp",
"Failed to transmit %u byte message to `%4s'.\n",
- pm->message_size,
+ pos->message_size,
GNUNET_i2s (&session->target));
#endif
- GNUNET_STATISTICS_update (session->plugin->env->stats,
- gettext_noop ("# bytes currently in TCP buffers"),
- - (int64_t) pm->message_size,
- GNUNET_NO);
- GNUNET_STATISTICS_update (session->plugin->env->stats,
- gettext_noop ("# bytes discarded by TCP (timeout)"),
- pm->message_size,
- GNUNET_NO);
- if (pm->transmit_cont != NULL)
- pm->transmit_cont (pm->transmit_cont_cls,
- &session->target, GNUNET_SYSERR);
- GNUNET_free (pm);
+ ret += pos->message_size;
+ GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
}
+ /* do this call before callbacks (so that if callbacks destroy
+ session, they have a chance to cancel actions done by this
+ call) */
+ process_pending_messages (session);
+ pid = session->target;
+ /* no do callbacks and do not use session again since
+ the callbacks may abort the session */
+ while (NULL != (pos = hd))
+ {
+ GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+ if (pos->transmit_cont != NULL)
+ pos->transmit_cont (pos->transmit_cont_cls,
+ &pid, GNUNET_SYSERR);
+ GNUNET_free (pos);
+ }
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ gettext_noop ("# bytes currently in TCP buffers"),
+ - (int64_t) ret,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ gettext_noop ("# bytes discarded by TCP (timeout)"),
+ ret,
+ GNUNET_NO);
return 0;
}
+ /* copy all pending messages that would fit */
ret = 0;
cbuf = buf;
- while (NULL != (pm = session->pending_messages_head))
+ hd = NULL;
+ tl = NULL;
+ while (NULL != (pos = session->pending_messages_head))
{
- if (size < pm->message_size)
+ if (ret + pos->message_size > size)
break;
- memcpy (cbuf, pm->msg, pm->message_size);
- cbuf += pm->message_size;
- ret += pm->message_size;
- size -= pm->message_size;
GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
session->pending_messages_tail,
- pm);
- GNUNET_STATISTICS_update (session->plugin->env->stats,
- gettext_noop ("# bytes currently in TCP buffers"),
- - (int64_t) pm->message_size,
- GNUNET_NO);
- if (pm->transmit_cont != NULL)
- pm->transmit_cont (pm->transmit_cont_cls,
- &session->target, GNUNET_OK);
- GNUNET_free (pm);
+ pos);
+ GNUNET_assert (size >= pos->message_size);
+ memcpy (cbuf, pos->msg, pos->message_size);
+ cbuf += pos->message_size;
+ ret += pos->message_size;
+ size -= pos->message_size;
+ GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
+ }
+ /* schedule 'continuation' before callbacks so that callbacks that
+ cancel everything don't cause us to use a session that no longer
+ exists... */
+ process_pending_messages (session);
+ pid = session->target;
+ /* we'll now call callbacks that may cancel the session; hence
+ we should not use 'session' after this point */
+ while (NULL != (pos = hd))
+ {
+ GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
+ if (pos->transmit_cont != NULL)
+ pos->transmit_cont (pos->transmit_cont_cls,
+ &pid, GNUNET_OK);
+ GNUNET_free (pos);
}
- if (session->client != NULL)
- process_pending_messages (session);
+ GNUNET_assert (hd == NULL);
+ GNUNET_assert (tl == NULL);
#if DEBUG_TCP > 1
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"tcp", "Transmitting %u bytes\n", ret);
#endif
- GNUNET_STATISTICS_update (session->plugin->env->stats,
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ gettext_noop ("# bytes currently in TCP buffers"),
+ - (int64_t) ret,
+ GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
gettext_noop ("# bytes transmitted via TCP"),
ret,
GNUNET_NO);
process_pending_messages (struct Session *session)
{
struct PendingMessage *pm;
+
GNUNET_assert (session->client != NULL);
if (session->transmit_handle != NULL)
return;
&session->target, GNUNET_SYSERR);
GNUNET_free (pm);
}
- if (GNUNET_NO == session->expecting_welcome)
- {
-#if DEBUG_TCP
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
- "tcp",
- "Notifying transport service about loss of connection with `%4s'.\n",
- GNUNET_i2s (&session->target));
-#endif
- /* Data session that actually went past the initial handshake;
- transport service may know about this one, so we need to
- notify transport service about disconnect */
- // FIXME: we should have a very clear connect-disconnect
- // protocol with gnunet-service-transport!
- // FIXME: but this is not possible for all plugins, so what gives?
- }
+ GNUNET_break (session->client != NULL);
if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (session->plugin->env->sched,
session->receive_delay_task);
- if (session->client != NULL)
- {
- GNUNET_SERVER_receive_done (session->client,
- GNUNET_SYSERR);
- }
+ GNUNET_SERVER_receive_done (session->client,
+ GNUNET_SYSERR);
}
- if (session->client != NULL)
- {
- GNUNET_SERVER_client_drop (session->client);
- session->client = NULL;
- }
+ GNUNET_SERVER_client_drop (session->client);
GNUNET_STATISTICS_update (session->plugin->env->stats,
gettext_noop ("# TCP sessions active"),
-1,
* is "on its own" (i.e. re-use existing TCP connection))
* @param addrlen length of the address in bytes
* @param force_address GNUNET_YES if the plugin MUST use the given address,
- * otherwise the plugin may use other addresses or
- * existing connections (if available)
+ * GNUNET_NO means the plugin may use any other address and
+ * GNUNET_SYSERR means that only reliable existing
+ * bi-directional connections should be used (regardless
+ * of address)
* @param cont continuation to call once the message has
* been transmitted (or if the transport is ready
* for the next transmission call; or if the
{
struct Plugin *plugin = cls;
struct Session *session;
+ struct Session *next;
struct PendingMessage *pm;
struct GNUNET_CONNECTION_Handle *sa;
int af;
gettext_noop ("# bytes TCP was asked to transmit"),
msgbuf_size,
GNUNET_NO);
- session = plugin->sessions;
/* FIXME: we could do this a cheaper with a hash table
where we could restrict the iteration to entries that match
the target peer... */
- while ( (session != NULL) &&
- ( (session->client == NULL) ||
- (0 != memcmp (target,
- &session->target,
- sizeof (struct GNUNET_PeerIdentity))) ||
- ( (GNUNET_YES == force_address) &&
- (addr != NULL) &&
- ( (addrlen != session->connect_alen) ||
- (0 != memcmp (session->connect_addr,
- addr,
- addrlen)) ) ) ) )
- session = session->next;
+ next = plugin->sessions;
+ while (NULL != (session = next))
+ {
+ next = session->next;
+ if (session->client == NULL)
+ continue;
+ if (0 != memcmp (target,
+ &session->target,
+ sizeof (struct GNUNET_PeerIdentity)))
+ continue;
+ if (GNUNET_SYSERR == force_address)
+ {
+ if (session->expecting_welcome == GNUNET_NO)
+ break; /* established and reliable (TCP!) */
+ else
+ continue; /* not established */
+ }
+ if (GNUNET_NO == force_address)
+ break;
+ GNUNET_break (GNUNET_YES == force_address);
+ if (addr == NULL)
+ {
+ GNUNET_break (0);
+ break;
+ }
+ if (session->inbound == GNUNET_YES)
+ continue;
+ if (addrlen != session->connect_alen)
+ continue;
+ if (0 == memcmp (session->connect_addr,
+ addr,
+ addrlen))
+ break;
+ }
if ( (session == NULL) &&
(addr == NULL) )
{
if (session == NULL)
{
if (sizeof (struct sockaddr_in) == addrlen)
- af = AF_INET;
+ {
+ af = AF_INET;
+ }
else if (sizeof (struct sockaddr_in6) == addrlen)
- af = AF_INET6;
+ {
+ af = AF_INET6;
+ }
else
{
GNUNET_break_op (0);
GNUNET_NO);
return -1;
}
-
#if DEBUG_TCP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"tcp",
}
GNUNET_assert (session != NULL);
GNUNET_assert (session->client != NULL);
-
GNUNET_STATISTICS_update (plugin->env->stats,
gettext_noop ("# bytes currently in TCP buffers"),
msgbuf_size,
* to be cancelled
*/
static void
-tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
+tcp_plugin_disconnect (void *cls,
+ const struct GNUNET_PeerIdentity *target)
{
struct Plugin *plugin = cls;
struct Session *session;
+ struct Session *next;
struct PendingMessage *pm;
#if DEBUG_TCP
"Asked to cancel session with `%4s'\n",
GNUNET_i2s (target));
#endif
- session = plugin->sessions;
- while (NULL != session)
+ next = plugin->sessions;
+ while (NULL != (session = next))
{
- if (0 == memcmp (target,
+ next = session->next;
+ if (0 != memcmp (target,
&session->target,
sizeof (struct GNUNET_PeerIdentity)))
+ continue;
+ pm = session->pending_messages_head;
+ while (pm != NULL)
{
- pm = session->pending_messages_head;
- while (pm != NULL)
- {
- pm->transmit_cont = NULL;
- pm->transmit_cont_cls = NULL;
- pm = pm->next;
- }
- if (session->client != NULL)
- {
- GNUNET_SERVER_client_drop (session->client);
- session->client = NULL;
- }
- /* rest of the clean-up of the session will be done as part of
- disconnect_notify which should be triggered any time now
- (or which may be triggering this call in the first place) */
+ pm->transmit_cont = NULL;
+ pm->transmit_cont_cls = NULL;
+ pm = pm->next;
}
- session = session->next;
+ disconnect_session (session);
}
}
+/**
+ * Context for address to string conversion.
+ */
struct PrettyPrinterContext
{
+ /**
+ * Function to call with the result.
+ */
GNUNET_TRANSPORT_AddressStringCallback asc;
+
+ /**
+ * Clsoure for 'asc'.
+ */
void *asc_cls;
+
+ /**
+ * Port to add after the IP address.
+ */
uint16_t port;
};
/**
* Append our port and forward the result.
+ *
+ * @param cls the 'struct PrettyPrinterContext*'
+ * @param hostname hostname part of the address
*/
static void
append_port (void *cls, const char *hostname)
* our listen port or our advertised port). If it is
* neither, we return one of these two ports at random.
*
+ * @param plugin global variables
+ * @param in_port port number to check
* @return either in_port or a more plausible port
*/
static uint16_t
* Another peer has suggested an address for this peer and transport
* plugin. Check that this could be a valid address.
*
- * @param cls closure
+ * @param cls closure, our 'struct Plugin*'
* @param addr pointer to the address
* @param addrlen length of addr
* @return GNUNET_OK if this is a plausible address for this peer
GNUNET_SERVER_client_keep (client);
session = create_session (plugin,
&wm->clientIdentity, client);
+ session->inbound = GNUNET_YES;
if (GNUNET_OK ==
GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
{
/**
* Task to signal the server that we can continue
* receiving from the TCP client now.
+ *
+ * @param cls the 'struct Session*'
+ * @param tc task context (unused)
*/
static void
delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
}
if ( (NULL == session) || (GNUNET_NO != session->expecting_welcome))
{
- GNUNET_break_op (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
if (aport != bport)
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"tcp",
- _
- ("TCP transport advertises itself as being on port %llu\n"),
+ _("TCP transport advertises itself as being on port %llu\n"),
aport);
- GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify,
+ GNUNET_SERVER_disconnect_notify (plugin->server,
+ &disconnect_notify,
plugin);
/* FIXME: do the two calls below periodically again and
not just once (since the info we get might change...) */