WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ SPDX-License-Identifier: AGPL3.0-or-later
*/
/**
* @file ats/gnunet-service-ats-new.c
* @brief ats service
* @author Matthias Wachs
* @author Christian Grothoff
- *
- * TODO:
- * - implement messages ATS -> transport
- * - implement loading / unloading of ATS plugins
- * - expose plugin the API to send messages ATS -> transport
*/
#include "platform.h"
#include "gnunet_util_lib.h"
* Plugin's representation of the preference.
*/
struct GNUNET_ATS_PreferenceHandle *ph;
-
+
/**
* Details about the preference.
*/
{
/**
- * Session data exposed to the plugin.
+ * Session data exposed to the plugin.
*/
struct GNUNET_ATS_SessionData data;
* Session state in the plugin.
*/
struct GNUNET_ATS_SessionHandle *sh;
-
+
/**
* Unique ID for the session when talking with the client.
- */
+ */
uint32_t session_id;
-
+
};
* Head of DLL of preferences expressed by this client.
*/
struct ClientPreference *cp_head;
-
+
/**
* Tail of DLL of preferences expressed by this client.
*/
struct ClientPreference *cp_tail;
-
+
} application;
struct {
* Map from session IDs to `struct GNUNET_ATS_Session` objects.
*/
struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
-
+
} transport;
-
+
} details;
};
/**
* Handle for statistics.
*/
-static struct GNUNET_STATISTICS_Handle *GSA_stats;
+static struct GNUNET_STATISTICS_Handle *stats;
/**
* Our solver.
*/
static struct GNUNET_ATS_SolverFunctions *plugin;
+/**
+ * Solver plugin name as string
+ */
+static char *plugin_name;
+
/**
* The transport client (there can only be one at a time).
*/
static struct Client *transport_client;
+/**
+ * Function called by the solver to prompt the transport to
+ * try out a new address.
+ *
+ * @param cls closure, NULL
+ * @param pid peer this is about
+ * @param address address the transport should try
+ */
+static void
+suggest_cb (void *cls,
+ const struct GNUNET_PeerIdentity *pid,
+ const char *address)
+{
+ struct GNUNET_MQ_Envelope *env;
+ size_t slen = strlen (address) + 1;
+ struct AddressSuggestionMessage *as;
+
+ if (NULL == transport_client)
+ {
+ // FIXME: stats!
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Suggesting address `%s' of peer `%s'\n",
+ address,
+ GNUNET_i2s (pid));
+ env = GNUNET_MQ_msg_extra (as,
+ slen,
+ GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION);
+ as->peer = *pid;
+ memcpy (&as[1],
+ address,
+ slen);
+ GNUNET_MQ_send (transport_client->mq,
+ env);
+}
+
+
+/**
+ * Function called by the solver to tell the transpor to
+ * allocate bandwidth for the specified session.
+ *
+ * @param cls closure, NULL
+ * @param session session this is about
+ * @param peer peer this is about
+ * @param bw_in suggested bandwidth for receiving
+ * @param bw_out suggested bandwidth for transmission
+ */
+static void
+allocate_cb (void *cls,
+ struct GNUNET_ATS_Session *session,
+ const struct GNUNET_PeerIdentity *peer,
+ struct GNUNET_BANDWIDTH_Value32NBO bw_in,
+ struct GNUNET_BANDWIDTH_Value32NBO bw_out)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct SessionAllocationMessage *sam;
+
+ (void) cls;
+ if ( (NULL == transport_client) ||
+ (session->client != transport_client) )
+ {
+ /* transport must have just died and solver is addressing the
+ losses of sessions (possibly of previous transport), ignore! */
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Allocating %u/%u bytes for %p of peer `%s'\n",
+ ntohl (bw_in.value__),
+ ntohl (bw_out.value__),
+ session,
+ GNUNET_i2s (peer));
+ env = GNUNET_MQ_msg (sam,
+ GNUNET_MESSAGE_TYPE_ATS_SESSION_ALLOCATION);
+ sam->session_id = session->session_id;
+ sam->peer = *peer;
+ sam->bandwidth_in = bw_in;
+ sam->bandwidth_out = bw_out;
+ GNUNET_MQ_send (transport_client->mq,
+ env);
+}
+
+
/**
* Convert @a properties to @a prop
*
/**
- * We have received a `struct ExpressPreferenceMessage` from an application client.
+ * We have received a `struct ExpressPreferenceMessage` from an application client.
*
* @param cls handle to the client
* @param msg the start message
*/
static void
handle_suggest (void *cls,
- const struct ExpressPreferenceMessage *msg)
+ const struct ExpressPreferenceMessage *msg)
{
struct Client *c = cls;
struct ClientPreference *cp;
GNUNET_SERVICE_client_drop (c->client);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client suggested we talk to %s with preference %d at rate %u\n",
+ GNUNET_i2s (&msg->peer),
+ (int) ntohl (msg->pk),
+ (int) ntohl (msg->bw.value__));
cp = GNUNET_new (struct ClientPreference);
cp->client = c;
cp->pref.peer = msg->peer;
/**
- * We have received a `struct ExpressPreferenceMessage` from an application client.
+ * We have received a `struct ExpressPreferenceMessage` from an application client.
*
* @param cls handle to the client
* @param msg the start message
*/
static void
handle_suggest_cancel (void *cls,
- const struct ExpressPreferenceMessage *msg)
+ const struct ExpressPreferenceMessage *msg)
{
struct Client *c = cls;
struct ClientPreference *cp;
-
+
if (CT_NONE == c->type)
c->type = CT_APPLICATION;
if (CT_APPLICATION != c->type)
cp = cp->next)
if ( (cp->pref.pk == (enum GNUNET_MQ_PreferenceKind) ntohl (msg->pk)) &&
(cp->pref.bw.value__ == msg->bw.value__) &&
- (0 == memcmp (&cp->pref.peer,
- &msg->peer,
- sizeof (struct GNUNET_PeerIdentity))) )
+ (0 == GNUNET_memcmp (&cp->pref.peer,
+ &msg->peer)) )
break;
if (NULL == cp)
{
/**
- * Check 'session_add' message is well-formed and comes from a
+ * Check 'session_add' message is well-formed and comes from a
* transport client.
*
* @param cls client that sent the request
const struct SessionAddMessage *message)
{
struct Client *c = cls;
+ const char *address = (const char *) &message[1];
struct GNUNET_ATS_Session *session;
int inbound_only = (GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY ==
ntohs (message->header.type));
session,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
session->sh = plugin->session_add (plugin->cls,
- &session->data);
+ &session->data,
+ address);
+ GNUNET_assert (NULL != session->sh);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transport has new session %p to %s\n",
+ session,
+ GNUNET_i2s (&message->peer));
GNUNET_SERVICE_client_continue (c->client);
}
-
/**
* Handle 'session update' messages from transport clients.
*
{
struct Client *c = cls;
struct GNUNET_ATS_Session *session;
-
+
if (CT_TRANSPORT != c->type)
{
GNUNET_break (0);
GNUNET_break (0);
GNUNET_SERVICE_client_drop (c->client);
return;
- }
+ }
+ GNUNET_assert (NULL != session->sh);
plugin->session_del (plugin->cls,
session->sh,
&session->data);
+ session->sh = NULL;
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap32_remove (c->details.transport.sessions,
session->session_id,
session));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transport lost session %p to %s\n",
+ session,
+ GNUNET_i2s (&session->data.peer));
GNUNET_free (session);
GNUNET_SERVICE_client_continue (c->client);
}
(void) key;
GNUNET_assert (c == session->client);
+ GNUNET_assert (NULL != session->sh);
plugin->session_del (plugin->cls,
session->sh,
&session->data);
+ session->sh = NULL;
GNUNET_free (session);
return GNUNET_OK;
}
}
+/**
+ * Task run at the end during shutdown.
+ *
+ * @param cls unused
+ */
+static void
+final_cleanup (void *cls)
+{
+ (void) cls;
+ if (NULL != stats)
+ {
+ GNUNET_STATISTICS_destroy (stats,
+ GNUNET_NO);
+ stats = NULL;
+ }
+ if (NULL != plugin)
+ {
+ GNUNET_PLUGIN_unload (plugin_name,
+ plugin);
+ plugin = NULL;
+ }
+ if (NULL != plugin_name)
+ {
+ GNUNET_free (plugin_name);
+ plugin_name = NULL;
+ }
+}
+
+
/**
* Task run during shutdown.
*
static void
cleanup_task (void *cls)
{
+ (void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"ATS shutdown initiated\n");
- if (NULL != GSA_stats)
- {
- GNUNET_STATISTICS_destroy (GSA_stats,
- GNUNET_NO);
- GSA_stats = NULL;
- }
+ GNUNET_SCHEDULER_add_now (&final_cleanup,
+ NULL);
}
const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_SERVICE_Handle *service)
{
- GSA_stats = GNUNET_STATISTICS_create ("ats",
- cfg);
+ static struct GNUNET_ATS_PluginEnvironment env;
+ char *solver;
+
+ stats = GNUNET_STATISTICS_create ("ats",
+ cfg);
+ if (GNUNET_SYSERR ==
+ GNUNET_CONFIGURATION_get_value_string (cfg,
+ "ats",
+ "SOLVER",
+ &solver))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "No ATS solver configured, using 'simple' approach\n");
+ solver = GNUNET_strdup ("simple");
+ }
GNUNET_SCHEDULER_add_shutdown (&cleanup_task,
NULL);
-#if 0
- if (GNUNET_OK !=
- GAS_plugin_init (cfg))
+ env.cls = NULL;
+ env.cfg = cfg;
+ env.stats = stats;
+ env.suggest_cb = &suggest_cb;
+ env.allocate_cb = &allocate_cb;
+ GNUNET_asprintf (&plugin_name,
+ "libgnunet_plugin_ats2_%s",
+ solver);
+ GNUNET_free (solver);
+ if (NULL == (plugin = GNUNET_PLUGIN_load (plugin_name,
+ &env)))
{
- GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to initialize solver `%s'!\n"),
+ plugin_name);
GNUNET_SCHEDULER_shutdown ();
return;
}
-#endif
}
&client_disconnect_cb,
NULL,
GNUNET_MQ_hd_fixed_size (suggest,
- GNUNET_MESSAGE_TYPE_ATS_SUGGEST,
- struct ExpressPreferenceMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_ATS_SUGGEST,
+ struct ExpressPreferenceMessage,
+ NULL),
GNUNET_MQ_hd_fixed_size (suggest_cancel,
- GNUNET_MESSAGE_TYPE_ATS_SUGGEST_CANCEL,
- struct ExpressPreferenceMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_ATS_SUGGEST_CANCEL,
+ struct ExpressPreferenceMessage,
+ NULL),
GNUNET_MQ_hd_fixed_size (start,
GNUNET_MESSAGE_TYPE_ATS_START,
struct GNUNET_MessageHeader,
GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY,
struct SessionAddMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (session_update,
+ GNUNET_MQ_hd_fixed_size (session_update,
GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE,
struct SessionUpdateMessage,
NULL),
- GNUNET_MQ_hd_fixed_size (session_del,
+ GNUNET_MQ_hd_fixed_size (session_del,
GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL,
struct SessionDelMessage,
NULL),