2 This file is part of GNUnet.
3 Copyright (C) 2014, 2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file transport/transport_api_monitor_plugins.c
23 * @brief montoring api for transport plugin session status
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_hello_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_transport_service.h"
32 #include "transport.h"
36 * Handle for a plugin session state monitor.
38 struct GNUNET_TRANSPORT_PluginMonitor
42 * Connection to the service.
44 struct GNUNET_MQ_Handle *mq;
49 const struct GNUNET_CONFIGURATION_Handle *cfg;
54 GNUNET_TRANSPORT_SessionMonitorCallback cb;
62 * Map of session_ids (reduced to 32-bits) to
63 * `struct GNUNET_TRANSPORT_PluginSession` objects.
65 struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
68 * Backoff for reconnect.
70 struct GNUNET_TIME_Relative backoff;
73 * Task ID for reconnect.
75 struct GNUNET_SCHEDULER_Task *reconnect_task;
81 * Abstract representation of a plugin's session.
82 * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
84 struct GNUNET_TRANSPORT_PluginSession
87 * Unique session identifier.
92 * Location for the client to store "data".
100 * Task run to re-establish the connection.
102 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
105 do_plugin_connect (void *cls);
109 * Free the session entry and notify the callback about its demise.
111 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
112 * @param key key of the session in the map
113 * @param value the session to free
114 * @return #GNUNET_OK (continue to iterate)
117 free_entry (void *cls,
121 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
122 struct GNUNET_TRANSPORT_PluginSession *ps = value;
128 GNUNET_break (GNUNET_YES ==
129 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
132 GNUNET_break (NULL == ps->client_ctx);
139 * Cut the existing connection and reconnect.
141 * @param pm our context
144 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
146 GNUNET_MQ_destroy (pm->mq);
148 GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
151 pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
152 pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
159 * Convert 64-bit session ID to 32-bit index for hash map.
161 * @param id 64-bit session ID
162 * @return 32-bit hash map index
165 wrap_id (uint64_t id)
167 return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
172 * Context for #locate_by_id().
180 struct GNUNET_TRANSPORT_PluginSession *ps;
191 * Locate a session entry.
193 * @param cls our `struct SearchContext`
194 * @param key key of the session in the map
195 * @param value a session
196 * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
199 locate_by_id (void *cls,
203 struct SearchContext *sc = cls;
204 struct GNUNET_TRANSPORT_PluginSession *ps = value;
206 if (sc->session_id == ps->session_id)
209 return GNUNET_SYSERR;
216 * Function called with responses from the service.
218 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
219 * @paramm tpmm message with event data
220 * @return #GNUNET_Ok if message is well-formed
223 check_event (void *cls,
224 const struct TransportPluginMonitorMessage *tpmm)
230 pname = (const char *) &tpmm[1];
231 pname_len = ntohs (tpmm->plugin_name_len);
232 paddr_len = ntohs (tpmm->plugin_address_len);
235 sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
236 ( (0 != pname_len) &&
237 ('\0' != pname[pname_len - 1]) ) )
240 return GNUNET_SYSERR;
247 * Function called with responses from the service.
249 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
250 * @paramm tpmm message with event data
253 handle_event (void *cls,
254 const struct TransportPluginMonitorMessage *tpmm)
256 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
257 struct GNUNET_TRANSPORT_PluginSession *ps;
260 enum GNUNET_TRANSPORT_SessionState ss;
263 struct GNUNET_TRANSPORT_SessionInfo info;
264 struct GNUNET_HELLO_Address addr;
265 struct SearchContext rv;
267 pname = (const char *) &tpmm[1];
268 pname_len = ntohs (tpmm->plugin_name_len);
269 paddr_len = ntohs (tpmm->plugin_address_len);
270 paddr = &pname[pname_len];
272 ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
273 if (GNUNET_TRANSPORT_SS_INIT == ss)
275 ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
276 ps->session_id = tpmm->session_id;
277 (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
278 wrap_id (tpmm->session_id),
280 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
285 rv.session_id = tpmm->session_id;
287 (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
288 wrap_id (tpmm->session_id),
295 reconnect_plugin_ctx (pm);
300 info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
301 info.num_msg_pending = ntohl (tpmm->msgs_pending);
302 info.num_bytes_pending = ntohl (tpmm->bytes_pending);
303 info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
304 info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
305 info.address = &addr;
306 addr.peer = tpmm->peer;
307 addr.address = (0 == paddr_len) ? NULL : paddr;
308 addr.address_length = paddr_len;
309 addr.transport_name = (0 == pname_len) ? NULL : pname;
310 addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
316 if (GNUNET_TRANSPORT_SS_DONE == ss)
318 GNUNET_break (NULL == ps->client_ctx);
319 GNUNET_assert (GNUNET_YES ==
320 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
321 wrap_id (tpmm->session_id),
329 * Function called with sync responses from the service.
331 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
332 * @param msg message from the service
335 handle_sync (void *cls,
336 const struct GNUNET_MessageHeader *msg)
338 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
340 /* we are in sync, notify callback */
349 * Generic error handler, called with the appropriate
350 * error code and the same closure specified at the creation of
352 * Not every message queue implementation supports an error handler.
354 * @param cls closure with the `struct GNUNET_NSE_Handle *`
355 * @param error error code
358 mq_error_handler (void *cls,
359 enum GNUNET_MQ_Error error)
361 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
363 reconnect_plugin_ctx (pm);
368 * Task run to re-establish the connection.
370 * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
373 do_plugin_connect (void *cls)
375 struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
376 struct GNUNET_MQ_MessageHandler handlers[] = {
377 GNUNET_MQ_hd_var_size (event,
378 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
379 struct TransportPluginMonitorMessage,
381 GNUNET_MQ_hd_fixed_size (sync,
382 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
383 struct GNUNET_MessageHeader,
385 GNUNET_MQ_handler_end ()
387 struct GNUNET_MessageHeader *msg;
388 struct GNUNET_MQ_Envelope *env;
390 pm->reconnect_task = NULL;
391 pm->mq = GNUNET_CLIENT_connect (pm->cfg,
398 env = GNUNET_MQ_msg (msg,
399 GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
400 GNUNET_MQ_send (pm->mq,
406 * Install a plugin session state monitor callback. The callback
407 * will be notified whenever the session changes.
409 * @param cfg configuration to use
410 * @param cb callback to invoke on events
411 * @param cb_cls closure for @a cb
412 * @return NULL on error, otherwise handle for cancellation
414 struct GNUNET_TRANSPORT_PluginMonitor *
415 GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
416 GNUNET_TRANSPORT_SessionMonitorCallback cb,
419 struct GNUNET_TRANSPORT_PluginMonitor *pm;
421 pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
425 do_plugin_connect (pm);
431 pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
437 * Cancel monitoring the plugin session state. The callback will
438 * be called once for each session that is up with the information
439 * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
440 * this is just to enable client-side cleanup).
442 * @param pm handle of the request that is to be cancelled
445 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
449 GNUNET_MQ_destroy (pm->mq);
452 if (NULL != pm->reconnect_task)
454 GNUNET_SCHEDULER_cancel (pm->reconnect_task);
455 pm->reconnect_task = NULL;
457 GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
460 GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
465 /* end of transport_api_monitor_plugins.c */