session timeout for udp and tcp
authorMatthias Wachs <wachs@net.in.tum.de>
Fri, 25 May 2012 09:34:49 +0000 (09:34 +0000)
committerMatthias Wachs <wachs@net.in.tum.de>
Fri, 25 May 2012 09:34:49 +0000 (09:34 +0000)
src/transport/plugin_transport_tcp.c
src/transport/plugin_transport_udp.c

index 8e76398ea81a16029de5c9a4869c004de991d0df..18482b11ec9bd883bbb0bc42dd4797ac8c9281f8 100644 (file)
@@ -272,6 +272,11 @@ struct Session
    */
   GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
 
+  /**
+   * Session timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
   /**
    * Address of the other peer (either based on our 'connect'
    * call or on our 'accept' call).
@@ -395,6 +400,26 @@ struct Plugin
 
 };
 
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
+
 /* DEBUG CODE */
 static const char *
 tcp_address_to_string (void *cls, const void *addr, size_t addrlen);
@@ -740,6 +765,8 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
                               gettext_noop ("# TCP sessions active"), 1,
                               GNUNET_NO);
   }
+  start_session_timeout (ret);
+
   return ret;
 }
 
@@ -919,14 +946,16 @@ disconnect_session (struct Session *session)
        GNUNET_i2s (&session->target),
        tcp_address_to_string(NULL, session->addr, session->addrlen));
 
-   if (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
-   {
+  stop_session_timeout (session);
+
+  if (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
+  {
      GNUNET_STATISTICS_update (session->plugin->env->stats,
                                gettext_noop ("# TCP sessions active"), -1,
                                GNUNET_NO);
      dec_sessions (plugin, session, __LINE__);
-   }
-   else GNUNET_assert (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
+  }
+  else GNUNET_assert (GNUNET_YES  == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
 
   /* clean up state */
   if (session->transmit_handle != NULL)
@@ -1037,6 +1066,8 @@ tcp_plugin_send (void *cls,
        "Asked to transmit %u bytes to `%s', added message to list.\n",
        msgbuf_size, GNUNET_i2s (&session->target));
 
+  reschedule_session_timeout (session);
+
   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessionmap, &session->target.hashPubKey, session))
   {
     GNUNET_assert (session->client != NULL);
@@ -1850,6 +1881,8 @@ delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       session->plugin->env->receive (session->plugin->env->cls,
                                      &session->target, NULL, &ats, 0, session,
                                      NULL, 0);
+  reschedule_session_timeout (session);
+
   if (delay.rel_value == 0)
     GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
   else
@@ -1948,6 +1981,9 @@ handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client,
                                 1, session,
                                 (GNUNET_YES == session->inbound) ? NULL : session->addr,
                                 (GNUNET_YES == session->inbound) ? 0 : session->addrlen);
+
+  reschedule_session_timeout (session);
+
   if (delay.rel_value == 0)
   {
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -2086,6 +2122,84 @@ try_connection_reversal (void *cls, const struct sockaddr *addr,
 }
 
 
+/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_assert (NULL != cls);
+  struct Session *s = cls;
+
+  s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+
+  /* call session destroy function */
+  disconnect_session(s);
+
+}
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+
+  s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                   &session_timeout,
+                                                   s);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+  GNUNET_SCHEDULER_cancel (s->timeout_task);
+  s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                   &session_timeout,
+                                                   s);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+
+  if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+  {
+    GNUNET_SCHEDULER_cancel (s->timeout_task);
+    s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
+      s);
+  }
+}
+
+
 /**
  * Entry point for the plugin.
  *
index 60814327cf8cbfb33c8db5f954cec3154fa5e2ef..5b133951b9c1df69c9c81754fef5bb85716a2668 100644 (file)
@@ -110,6 +110,11 @@ struct Session
    */
   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
 
+  /**
+   * Session timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
   /**
    * expected delay for ACKs
    */
@@ -293,6 +298,11 @@ struct UDP_ACK_Message
 
 };
 
+/**
+ * Encapsulation of all of the state of the plugin.
+ */
+struct Plugin * plugin;
+
 
 /**
  * We have been notified that our readset has something to read.  We don't
@@ -316,6 +326,26 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 static void
 udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
+
+
 /**
  * Function called for a quick conversion of the binary address to
  * a numeric address.  Note that the caller must not free the
@@ -649,18 +679,15 @@ free_session (struct Session *s)
 
 
 /**
- * Destroy a session, plugin is being unloaded.
+ * Functions with this signature are called whenever we need
+ * to close a session due to a disconnect or failure to
+ * establish a connection.
  *
- * @param cls unused
- * @param key hash of public key of target peer
- * @param value a 'struct PeerSession*' to clean up
- * @return GNUNET_OK (continue to iterate)
+ * @param session session to close down
  */
-static int
-disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
+static void
+disconnect_session (struct Session *s)
 {
-  struct Plugin *plugin = cls;
-  struct Session *s = value;
   struct UDPMessageWrapper *udpw;
   struct UDPMessageWrapper *next;
 
@@ -670,6 +697,7 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
          s,
          GNUNET_i2s (&s->target),
          GNUNET_a2s (s->sock_addr, s->addrlen));
+  stop_session_timeout(s);
   next = plugin->ipv4_queue_head;
   while (NULL != (udpw = next))
   {
@@ -718,6 +746,20 @@ disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
     s->in_destroy = GNUNET_YES;
   else
     free_session (s);
+}
+
+/**
+ * Destroy a session, plugin is being unloaded.
+ *
+ * @param cls unused
+ * @param key hash of public key of target peer
+ * @param value a 'struct PeerSession*' to clean up
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
+{
+  disconnect_session(value);
   return GNUNET_OK;
 }
 
@@ -804,6 +846,8 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
   s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
   s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
 
+  start_session_timeout(s);
+
   return s;
 }
 
@@ -1129,6 +1173,7 @@ udp_plugin_send (void *cls,
   udp->reserved = htonl (0);
   udp->sender = *plugin->env->my_identity;
 
+  reschedule_session_timeout(s);
   if (mlen <= UDP_MTU)
   {
     udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
@@ -1289,6 +1334,7 @@ process_inbound_tokenized_messages (void *cls, void *client,
                 si->arg,
                 si->args);
   si->session->flow_delay_for_other_peer = delay;
+  reschedule_session_timeout(si->session);
   return GNUNET_OK;
 }
 
@@ -2141,6 +2187,82 @@ setup_sockets (struct Plugin *plugin, struct sockaddr_in6 *serverAddrv6, struct
   return sockets_created;
 }
 
+/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_assert (NULL != cls);
+  struct Session *s = cls;
+
+  s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+
+  /* call session destroy function */
+  disconnect_session(s);
+
+}
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+
+  s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                   &session_timeout,
+                                                   s);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+  GNUNET_SCHEDULER_cancel (s->timeout_task);
+  s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                   &session_timeout,
+                                                   s);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+
+  if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+  {
+    GNUNET_SCHEDULER_cancel (s->timeout_task);
+    s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
+      s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
+      s);
+  }
+}
 
 /**
  * The exported method. Makes the core api available via a global and
@@ -2154,7 +2276,7 @@ libgnunet_plugin_transport_udp_init (void *cls)
 {
   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
   struct GNUNET_TRANSPORT_PluginFunctions *api;
-  struct Plugin *plugin;
+  struct Plugin *p;
   unsigned long long port;
   unsigned long long aport;
   unsigned long long broadcast;
@@ -2263,21 +2385,23 @@ libgnunet_plugin_transport_udp_init (void *cls)
     udp_max_bps = 1024 * 1024 * 50;     /* 50 MB/s == infinity for practical purposes */
   }
 
-  plugin = GNUNET_malloc (sizeof (struct Plugin));
+  p = GNUNET_malloc (sizeof (struct Plugin));
   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
 
-  GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
+  GNUNET_BANDWIDTH_tracker_init (&p->tracker,
                                  GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
-  plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
-  plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
-  plugin->port = port;
-  plugin->aport = aport;
-  plugin->broadcast_interval = interval;
-  plugin->enable_ipv6 = enable_v6;
-  plugin->env = env;
-
-  api->cls = plugin;
+  p->sessions = GNUNET_CONTAINER_multihashmap_create (10);
+  p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
+  p->port = port;
+  p->aport = aport;
+  p->broadcast_interval = interval;
+  p->enable_ipv6 = enable_v6;
+  p->env = env;
+
+  plugin = p;
+
+  api->cls = p;
   api->send = NULL;
   api->disconnect = &udp_disconnect;
   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
@@ -2288,11 +2412,11 @@ libgnunet_plugin_transport_udp_init (void *cls)
   api->send = &udp_plugin_send;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
-  res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
-  if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
+  res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
+  if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
   {
     LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
-    GNUNET_free (plugin);
+    GNUNET_free (p);
     GNUNET_free (api);
     return NULL;
   }
@@ -2300,7 +2424,7 @@ libgnunet_plugin_transport_udp_init (void *cls)
   if (broadcast == GNUNET_YES)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
-    setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
+    setup_broadcast (p, &serverAddrv6, &serverAddrv4);
   }
 
   GNUNET_free_non_null (bind4_address);