*/
GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
+ /**
+ * Session timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
/**
* Address of the other peer (either based on our 'connect'
* call or on our 'accept' call).
};
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
+
/* DEBUG CODE */
static const char *
tcp_address_to_string (void *cls, const void *addr, size_t addrlen);
gettext_noop ("# TCP sessions active"), 1,
GNUNET_NO);
}
+ start_session_timeout (ret);
+
return ret;
}
GNUNET_i2s (&session->target),
tcp_address_to_string(NULL, session->addr, session->addrlen));
- if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
- {
+ stop_session_timeout (session);
+
+ if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session))
+ {
GNUNET_STATISTICS_update (session->plugin->env->stats,
gettext_noop ("# TCP sessions active"), -1,
GNUNET_NO);
dec_sessions (plugin, session, __LINE__);
- }
- else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
+ }
+ else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->nat_wait_conns, &session->target.hashPubKey, session));
/* clean up state */
if (session->transmit_handle != NULL)
"Asked to transmit %u bytes to `%s', added message to list.\n",
msgbuf_size, GNUNET_i2s (&session->target));
+ reschedule_session_timeout (session);
+
if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessionmap, &session->target.hashPubKey, session))
{
GNUNET_assert (session->client != NULL);
session->plugin->env->receive (session->plugin->env->cls,
&session->target, NULL, &ats, 0, session,
NULL, 0);
+ reschedule_session_timeout (session);
+
if (delay.rel_value == 0)
GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
else
1, session,
(GNUNET_YES == session->inbound) ? NULL : session->addr,
(GNUNET_YES == session->inbound) ? 0 : session->addrlen);
+
+ reschedule_session_timeout (session);
+
if (delay.rel_value == 0)
{
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_assert (NULL != cls);
+ struct Session *s = cls;
+
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+
+ /* call session destroy function */
+ disconnect_session(s);
+
+}
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+
+ if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
+ s);
+ }
+}
+
+
/**
* Entry point for the plugin.
*
*/
struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+ /**
+ * Session timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
/**
* expected delay for ACKs
*/
};
+/**
+ * Encapsulation of all of the state of the plugin.
+ */
+struct Plugin * plugin;
+
/**
* We have been notified that our readset has something to read. We don't
static void
udp_plugin_select_v6 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
+
+
/**
* Function called for a quick conversion of the binary address to
* a numeric address. Note that the caller must not free the
/**
- * Destroy a session, plugin is being unloaded.
+ * Functions with this signature are called whenever we need
+ * to close a session due to a disconnect or failure to
+ * establish a connection.
*
- * @param cls unused
- * @param key hash of public key of target peer
- * @param value a 'struct PeerSession*' to clean up
- * @return GNUNET_OK (continue to iterate)
+ * @param session session to close down
*/
-static int
-disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
+static void
+disconnect_session (struct Session *s)
{
- struct Plugin *plugin = cls;
- struct Session *s = value;
struct UDPMessageWrapper *udpw;
struct UDPMessageWrapper *next;
s,
GNUNET_i2s (&s->target),
GNUNET_a2s (s->sock_addr, s->addrlen));
+ stop_session_timeout(s);
next = plugin->ipv4_queue_head;
while (NULL != (udpw = next))
{
s->in_destroy = GNUNET_YES;
else
free_session (s);
+}
+
+/**
+ * Destroy a session, plugin is being unloaded.
+ *
+ * @param cls unused
+ * @param key hash of public key of target peer
+ * @param value a 'struct PeerSession*' to clean up
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+disconnect_and_free_it (void *cls, const GNUNET_HashCode * key, void *value)
+{
+ disconnect_session(value);
return GNUNET_OK;
}
s->flow_delay_from_other_peer = GNUNET_TIME_absolute_get_zero();
s->last_expected_delay = GNUNET_TIME_UNIT_SECONDS;
+ start_session_timeout(s);
+
return s;
}
udp->reserved = htonl (0);
udp->sender = *plugin->env->my_identity;
+ reschedule_session_timeout(s);
if (mlen <= UDP_MTU)
{
udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
si->arg,
si->args);
si->session->flow_delay_for_other_peer = delay;
+ reschedule_session_timeout(si->session);
return GNUNET_OK;
}
return sockets_created;
}
+/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_assert (NULL != cls);
+ struct Session *s = cls;
+
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %p was idle for %llu, disconnecting\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+
+ /* call session destroy function */
+ disconnect_session(s);
+
+}
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+ &session_timeout,
+ s);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p set to %llu\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+ GNUNET_assert (NULL != s);
+
+ if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (s->timeout_task);
+ s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout rescheduled for session %p canceled\n",
+ s, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Timeout for session %p was not active\n",
+ s);
+ }
+}
/**
* The exported method. Makes the core api available via a global and
{
struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
struct GNUNET_TRANSPORT_PluginFunctions *api;
- struct Plugin *plugin;
+ struct Plugin *p;
unsigned long long port;
unsigned long long aport;
unsigned long long broadcast;
udp_max_bps = 1024 * 1024 * 50; /* 50 MB/s == infinity for practical purposes */
}
- plugin = GNUNET_malloc (sizeof (struct Plugin));
+ p = GNUNET_malloc (sizeof (struct Plugin));
api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
- GNUNET_BANDWIDTH_tracker_init (&plugin->tracker,
+ GNUNET_BANDWIDTH_tracker_init (&p->tracker,
GNUNET_BANDWIDTH_value_init ((uint32_t)udp_max_bps), 30);
- plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
- plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin);
- plugin->port = port;
- plugin->aport = aport;
- plugin->broadcast_interval = interval;
- plugin->enable_ipv6 = enable_v6;
- plugin->env = env;
-
- api->cls = plugin;
+ p->sessions = GNUNET_CONTAINER_multihashmap_create (10);
+ p->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ p->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, p);
+ p->port = port;
+ p->aport = aport;
+ p->broadcast_interval = interval;
+ p->enable_ipv6 = enable_v6;
+ p->env = env;
+
+ plugin = p;
+
+ api->cls = p;
api->send = NULL;
api->disconnect = &udp_disconnect;
api->address_pretty_printer = &udp_plugin_address_pretty_printer;
api->send = &udp_plugin_send;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n");
- res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4);
- if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL)))
+ res = setup_sockets (p, &serverAddrv6, &serverAddrv4);
+ if ((res == 0) || ((p->sockv4 == NULL) && (p->sockv6 == NULL)))
{
LOG (GNUNET_ERROR_TYPE_ERROR, "Failed to create network sockets, plugin failed\n");
- GNUNET_free (plugin);
+ GNUNET_free (p);
GNUNET_free (api);
return NULL;
}
if (broadcast == GNUNET_YES)
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n");
- setup_broadcast (plugin, &serverAddrv6, &serverAddrv4);
+ setup_broadcast (p, &serverAddrv6, &serverAddrv4);
}
GNUNET_free_non_null (bind4_address);