-makefile for new test_stream_local (commented)
[oweals/gnunet.git] / src / transport / plugin_transport_tcp.c
index 2b2d7289e56f624c48c7e4fbe8874476301a8f79..9831070dd5e759adc0b67608f2e19f56d5210bb9 100644 (file)
 
 #define DEBUG_TCP_NAT GNUNET_EXTRA_LOGGING
 
+
+/**
+ * How long until we give up on establishing an NAT connection?
+ * Must be > 4 RTT
+ */
+#define NAT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
+
+
 GNUNET_NETWORK_STRUCT_BEGIN
 
 /**
@@ -239,6 +247,11 @@ struct Session
    */
   struct GNUNET_SERVER_Client *client;
 
+  /**
+   * Task cleaning up a NAT client connection establishment attempt;
+   */
+  GNUNET_SCHEDULER_TaskIdentifier nat_connection_timeout;
+
   /**
    * Messages currently pending for transmission
    * to this peer, if any.
@@ -390,6 +403,44 @@ struct Plugin
 
 };
 
+/* DEBUG CODE */
+static const char *
+tcp_address_to_string (void *cls, const void *addr, size_t addrlen);
+
+static unsigned int sessions;
+
+static void inc_sessions (struct Plugin *plugin, struct Session *session, int line)
+{
+  sessions ++;
+  unsigned int size = GNUNET_CONTAINER_multihashmap_size(plugin->sessionmap);
+  if (sessions != size)
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "Inconsistent sessions %u <-> session map size: %u\n",
+        sessions, size);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "%4i Session increased to %u (session map size: %u): `%s' `%s'\n",
+      line,
+      sessions,
+      size,
+      GNUNET_i2s (&session->target),
+      tcp_address_to_string (NULL, session->addr, session->addrlen));
+}
+
+static void dec_sessions (struct Plugin *plugin, struct Session *session, int line)
+{
+  GNUNET_assert (sessions > 0);
+  unsigned int size = GNUNET_CONTAINER_multihashmap_size(plugin->sessionmap);
+  sessions --;
+  if (sessions != size)
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "Inconsistent sessions %u <-> session map size: %u\n",
+      sessions, size);
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "%4i Session decreased to %u (session map size: %u): `%s' `%s'\n",
+      line,
+      sessions,
+      size,
+      GNUNET_i2s (&session->target),
+      tcp_address_to_string (NULL, session->addr, session->addrlen));
+}
+/* DEBUG CODE */
+
 
 /**
  * Function to check if an inbound connection is acceptable.
@@ -525,6 +576,53 @@ tcp_address_to_string (void *cls, const void *addr, size_t addrlen)
 }
 
 
+/**
+ * Function called to convert a string address to
+ * a binary address.
+ *
+ * @param cls closure ('struct Plugin*')
+ * @param addr string address
+ * @param addrlen length of the address
+ * @param buf location to store the buffer
+ * @param added location to store the number of bytes in the buffer.
+ *        If the function returns GNUNET_SYSERR, its contents are undefined.
+ * @return GNUNET_OK on success, GNUNET_SYSERR on failure
+ */
+int
+tcp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
+    void **buf, size_t *added)
+{
+  struct sockaddr_storage socket_address;
+  int ret = GNUNET_STRINGS_to_address_ip (addr, addrlen,
+    &socket_address);
+
+  if (ret != GNUNET_OK)
+    return GNUNET_SYSERR;
+
+  if (socket_address.ss_family == AF_INET)
+  {
+    struct IPv4TcpAddress *t4;
+    struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
+    t4 = GNUNET_malloc (sizeof (struct IPv4TcpAddress));
+    t4->ipv4_addr = in4->sin_addr.s_addr;
+    t4->t4_port = in4->sin_port;
+    *buf = t4;
+    *added = sizeof (struct IPv4TcpAddress);
+  }
+  else if (socket_address.ss_family == AF_INET6)
+  {
+    struct IPv6TcpAddress *t6;
+    struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
+    t6 = GNUNET_malloc (sizeof (struct IPv6TcpAddress));
+    t6->ipv6_addr = in6->sin6_addr;
+    t6->t6_port = in6->sin6_port;
+    *buf = t6;
+    *added = sizeof (struct IPv6TcpAddress);
+  }
+  return GNUNET_SYSERR;
+}
+
+
 struct SessionClientCtx
 {
   const struct GNUNET_SERVER_Client *client;
@@ -618,9 +716,11 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
   GNUNET_CONTAINER_DLL_insert (ret->pending_messages_head,
                                ret->pending_messages_tail, pm);
   if (is_nat != GNUNET_YES)
+  {
     GNUNET_STATISTICS_update (plugin->env->stats,
                               gettext_noop ("# TCP sessions active"), 1,
                               GNUNET_NO);
+  }
   return ret;
 }
 
@@ -807,7 +907,14 @@ disconnect_session (struct Session *session)
                    GNUNET_i2s (&session->target),
                    tcp_address_to_string(NULL, session->addr, session->addrlen));
 
-  GNUNET_assert (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session));
+   if (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
+   {
+     GNUNET_STATISTICS_update (session->plugin->env->stats,
+                               gettext_noop ("# TCP sessions active"), -1,
+                               GNUNET_NO);
+     dec_sessions (plugin, session, __LINE__);
+   }
+   else GNUNET_assert (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
 
   /* clean up state */
   if (session->transmit_handle != NULL)
@@ -817,6 +924,13 @@ disconnect_session (struct Session *session)
   }
   session->plugin->env->session_end (session->plugin->env->cls,
                                      &session->target, session);
+
+  if (session->nat_connection_timeout != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (session->nat_connection_timeout);
+    session->nat_connection_timeout = GNUNET_SCHEDULER_NO_TASK;
+  }
+
   while (NULL != (pm = session->pending_messages_head))
   {
 #if DEBUG_TCP
@@ -840,7 +954,6 @@ disconnect_session (struct Session *session)
                          GNUNET_SYSERR);
     GNUNET_free (pm);
   }
-  GNUNET_break (session->client != NULL);
   if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (session->receive_delay_task);
@@ -852,9 +965,8 @@ disconnect_session (struct Session *session)
     GNUNET_SERVER_client_drop (session->client);
     session->client = NULL;
   }
-  GNUNET_STATISTICS_update (session->plugin->env->stats,
-                            gettext_noop ("# TCP sessions active"), -1,
-                            GNUNET_NO);
+
+
   GNUNET_free_non_null (session->addr);
   GNUNET_assert (NULL == session->transmit_handle);
   GNUNET_free (session);
@@ -901,13 +1013,7 @@ tcp_plugin_send (void *cls,
 
   GNUNET_assert (plugin != NULL);
   GNUNET_assert (session != NULL);
-  GNUNET_assert (session->client != NULL);
 
-  GNUNET_SERVER_client_set_timeout (session->client,
-                                    GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  GNUNET_STATISTICS_update (plugin->env->stats,
-                            gettext_noop ("# bytes currently in TCP buffers"),
-                            msgbuf_size, GNUNET_NO);
   /* create new message entry */
   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msgbuf_size);
   pm->msg = (const char *) &pm[1];
@@ -917,16 +1023,51 @@ tcp_plugin_send (void *cls,
   pm->transmit_cont = cont;
   pm->transmit_cont_cls = cont_cls;
 
-  /* append pm to pending_messages list */
-  GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
-                                    session->pending_messages_tail, pm);
 
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
                    "Asked to transmit %u bytes to `%s', added message to list.\n",
                    msgbuf_size, GNUNET_i2s (&session->target));
 
-  process_pending_messages (session);
-  return msgbuf_size;
+  if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessionmap, &session->target.hashPubKey, session))
+  {
+    GNUNET_assert (session->client != NULL);
+
+    GNUNET_SERVER_client_set_timeout (session->client,
+                                      GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              gettext_noop ("# bytes currently in TCP buffers"),
+                              msgbuf_size, GNUNET_NO);
+
+    /* append pm to pending_messages list */
+    GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+                                      session->pending_messages_tail, pm);
+
+    process_pending_messages (session);
+    return msgbuf_size;
+  }
+  else if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->nat_wait_conns, &session->target.hashPubKey, session))
+  {
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
+                     "This NAT WAIT session for peer `%s' is not yet ready!\n",
+                     GNUNET_i2s (&session->target));
+
+    GNUNET_STATISTICS_update (plugin->env->stats,
+                              gettext_noop ("# bytes currently in TCP buffers"),
+                              msgbuf_size, GNUNET_NO);
+
+    /* append pm to pending_messages list */
+    GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
+                                      session->pending_messages_tail, pm);
+    return msgbuf_size;
+  }
+  else
+  {
+    if (cont != NULL)
+      cont (cont_cls, &session->target, GNUNET_SYSERR);
+    GNUNET_break (0);
+    GNUNET_free (pm);
+    return GNUNET_SYSERR; /* session does not exist here */
+  }
 }
 
 struct SessionItCtx
@@ -979,6 +1120,22 @@ int session_lookup_it (void *cls,
   return GNUNET_NO;
 }
 
+/**
+ * Task cleaning up a NAT connection attempt after timeout
+ */
+
+static void
+nat_connect_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Session *session = cls;
+
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
+                   "NAT WAIT connection to `%4s' at `%s' could not be established, removing session\n",
+                   GNUNET_i2s (&session->target), tcp_address_to_string(NULL, session->addr, session->addrlen));
+
+  disconnect_session (session);
+
+}
 
 /**
  * Create a new session to transmit data to the target
@@ -1119,18 +1276,30 @@ tcp_plugin_get_session (void *cls,
     session->addrlen = 0;
     session->addr = NULL;
     session->ats_address_network_type = ats.value;
+    session->nat_connection_timeout = GNUNET_SCHEDULER_add_delayed(NAT_TIMEOUT,
+        &nat_connect_timeout,
+        session);
     GNUNET_assert (session != NULL);
 
     GNUNET_assert (GNUNET_CONTAINER_multihashmap_put
                    (plugin->nat_wait_conns, &address->peer.hashPubKey, session,
                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY) == GNUNET_OK);
-#if DEBUG_TCP_NAT
+
     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
                      "Created NAT WAIT connection to `%4s' at `%s'\n",
                      GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs));
-#endif
-    GNUNET_NAT_run_client (plugin->nat, &a4);
-    return session;
+
+    if (GNUNET_OK == GNUNET_NAT_run_client (plugin->nat, &a4))
+      return session;
+    else
+    {
+      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
+                       "Running NAT client for `%4s' at `%s' failed\n",
+                       GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs));
+
+      disconnect_session (session);
+      return NULL;
+    }
   }
 
   /* create new outbound session */
@@ -1161,7 +1330,7 @@ tcp_plugin_get_session (void *cls,
   session->ats_address_network_type = ats.value;
 
   GNUNET_CONTAINER_multihashmap_put(plugin->sessionmap, &address->peer.hashPubKey, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
+  inc_sessions (plugin, session, __LINE__);
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
                    "Creating new session for `%s' address `%s' session %p\n",
                    GNUNET_i2s (&address->peer),
@@ -1189,26 +1358,6 @@ int session_disconnect_it (void *cls,
   return GNUNET_YES;
 }
 
-int session_nat_disconnect_it (void *cls,
-               const GNUNET_HashCode * key,
-               void *value)
-{
-  struct Session *session = value;
-
-  if (session != NULL)
-  {
-    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
-                     "Cleaning up pending NAT session for peer `%4s'\n", GNUNET_i2s (&session->target));
-    GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (session->plugin->nat_wait_conns, &session->target.hashPubKey, session));
-    GNUNET_SERVER_client_drop (session->client);
-    GNUNET_SERVER_receive_done (session->client, GNUNET_SYSERR);
-    GNUNET_free (session);
-  }
-
-  return GNUNET_YES;
-}
-
-
 /**
  * Function that can be called to force a disconnect from the
  * specified neighbour.  This should also cancel all previously
@@ -1241,10 +1390,7 @@ tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
   {
     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
                      "Cleaning up pending NAT session for peer `%4s'\n", GNUNET_i2s (target));
-    GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (plugin->nat_wait_conns, &target->hashPubKey, nat_session));
-    GNUNET_SERVER_client_drop (nat_session->client);
-    GNUNET_SERVER_receive_done (nat_session->client, GNUNET_SYSERR);
-    GNUNET_free (nat_session);
+    disconnect_session (nat_session);
   }
 }
 
@@ -1268,6 +1414,8 @@ struct PrettyPrinterContext
    * Port to add after the IP address.
    */
   uint16_t port;
+
+  int ipv6;
 };
 
 
@@ -1289,7 +1437,10 @@ append_port (void *cls, const char *hostname)
     GNUNET_free (ppc);
     return;
   }
-  GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
+  if (GNUNET_YES == ppc->ipv6)
+    GNUNET_asprintf (&ret, "[%s]:%d", hostname, ppc->port);
+  else
+    GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
   ppc->asc (ppc->asc_cls, ret);
   GNUNET_free (ret);
 }
@@ -1351,11 +1502,17 @@ tcp_plugin_address_pretty_printer (void *cls, const char *type,
   else
   {
     /* invalid address */
+    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp",
+        "Invalid address to string request: plugin `%s', address length: %u bytes\n");
     GNUNET_break_op (0);
     asc (asc_cls, NULL);
     return;
   }
   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
+  if (addrlen == sizeof (struct IPv6TcpAddress))
+    ppc->ipv6 = GNUNET_YES;
+  else
+    ppc->ipv6 = GNUNET_NO;
   ppc->asc = asc;
   ppc->asc_cls = asc_cls;
   ppc->port = port;
@@ -1501,6 +1658,12 @@ handle_tcp_nat_probe (void *cls, struct GNUNET_SERVER_Client *client,
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp",
                    "Found session for NAT probe!\n");
 
+  if (session->nat_connection_timeout != GNUNET_SCHEDULER_NO_TASK)
+  {
+    GNUNET_SCHEDULER_cancel (session->nat_connection_timeout);
+    session->nat_connection_timeout = GNUNET_SCHEDULER_NO_TASK;
+  }
+
   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
                  (plugin->nat_wait_conns,
                   &tcp_nat_probe->clientIdentity.hashPubKey,
@@ -1556,7 +1719,7 @@ handle_tcp_nat_probe (void *cls, struct GNUNET_SERVER_Client *client,
   GNUNET_free (vaddr);
 
   GNUNET_CONTAINER_multihashmap_put(plugin->sessionmap, &session->target.hashPubKey, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
+  inc_sessions (plugin, session, __LINE__);
   GNUNET_STATISTICS_update (plugin->env->stats,
                             gettext_noop ("# TCP sessions active"), 1,
                             GNUNET_NO);
@@ -1656,6 +1819,7 @@ handle_tcp_welcome (void *cls, struct GNUNET_SERVER_Client *client,
 #endif
     }
     GNUNET_CONTAINER_multihashmap_put(plugin->sessionmap, &wm->clientIdentity.hashPubKey, session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+    inc_sessions (plugin, session, __LINE__);
   }
 
   if (session->expecting_welcome != GNUNET_YES)
@@ -1732,6 +1896,14 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client,
   if (NULL == session)
   {
     /* No inbound session found */
+    void *vaddr;
+    size_t alen;
+    GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
+    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp",
+                     "Received unexpected %u bytes of type %u from `%s'\n",
+                     (unsigned int) ntohs (message->size),
+                     (unsigned int) ntohs (message->type),
+                     GNUNET_a2s(vaddr, alen));
     GNUNET_break_op (0);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
@@ -1739,6 +1911,14 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client,
   else if (GNUNET_YES == session->expecting_welcome)
   {
     /* Session is expecting WELCOME message */
+    void *vaddr;
+    size_t alen;
+    GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
+    GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp",
+                     "Received unexpected %u bytes of type %u from `%s'\n",
+                     (unsigned int) ntohs (message->size),
+                     (unsigned int) ntohs (message->type),
+                     GNUNET_a2s(vaddr, alen));
     GNUNET_break_op (0);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
@@ -1942,6 +2122,18 @@ libgnunet_plugin_transport_tcp_init (void *cls)
   struct sockaddr **addrs;
   socklen_t *addrlens;
 
+  if (NULL == env->receive)
+  {
+    /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
+       initialze the plugin or the API */
+    api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
+    api->cls = NULL;
+    api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
+    api->address_to_string = &tcp_address_to_string;
+    api->string_to_address = &tcp_string_to_address;
+    return api;
+  }
+
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-tcp",
                                              "MAX_CONNECTIONS",
@@ -1980,8 +2172,6 @@ libgnunet_plugin_transport_tcp_init (void *cls)
   else
     service = NULL;
 
-
-
   plugin = GNUNET_malloc (sizeof (struct Plugin));
   plugin->sessionmap = GNUNET_CONTAINER_multihashmap_create(max_connections);
   plugin->max_connections = max_connections;
@@ -2024,6 +2214,7 @@ libgnunet_plugin_transport_tcp_init (void *cls)
   api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
   api->check_address = &tcp_plugin_check_address;
   api->address_to_string = &tcp_address_to_string;
+  api->string_to_address = &tcp_string_to_address;
   plugin->service = service;
   if (service != NULL)
   {
@@ -2069,6 +2260,10 @@ libgnunet_plugin_transport_tcp_init (void *cls)
                      _
                      ("TCP transport advertises itself as being on port %llu\n"),
                      aport);
+  /* Initially set connections to 0 */
+  GNUNET_STATISTICS_set(plugin->env->stats,
+                        gettext_noop ("# TCP sessions active"), 0,
+                        GNUNET_NO);
   return api;
 }
 
@@ -2083,13 +2278,17 @@ libgnunet_plugin_transport_tcp_done (void *cls)
   struct Plugin *plugin = api->cls;
   struct TCPProbeContext *tcp_probe;
 
+  if (NULL == plugin)
+  {
+    GNUNET_free (api);
+    return NULL;
+  }
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "Shutting down TCP plugin\n");
 
-
   /* Removing leftover sessions */
   GNUNET_CONTAINER_multihashmap_iterate(plugin->sessionmap, &session_disconnect_it, NULL);
   /* Removing leftover NAT sessions */
-  GNUNET_CONTAINER_multihashmap_iterate(plugin->nat_wait_conns, &session_nat_disconnect_it, NULL);
+  GNUNET_CONTAINER_multihashmap_iterate(plugin->nat_wait_conns, &session_disconnect_it, NULL);
 
   if (plugin->service != NULL)
     GNUNET_SERVICE_stop (plugin->service);