also config files
[oweals/gnunet.git] / src / transport / transport_api_monitor_plugins.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file transport/transport_api_monitor_plugins.c
23  * @brief montoring api for transport plugin session status
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_hello_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_transport_service.h"
32 #include "transport.h"
33
34
35 /**
36  * Handle for a plugin session state monitor.
37  */
38 struct GNUNET_TRANSPORT_PluginMonitor
39 {
40   /**
41    * Connection to the service.
42    */
43   struct GNUNET_MQ_Handle *mq;
44
45   /**
46    * Our configuration.
47    */
48   const struct GNUNET_CONFIGURATION_Handle *cfg;
49
50   /**
51    * Callback to call.
52    */
53   GNUNET_TRANSPORT_SessionMonitorCallback cb;
54
55   /**
56    * Closure for @e cb
57    */
58   void *cb_cls;
59
60   /**
61    * Map of session_ids (reduced to 32-bits) to
62    * `struct GNUNET_TRANSPORT_PluginSession` objects.
63    */
64   struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
65
66   /**
67    * Backoff for reconnect.
68    */
69   struct GNUNET_TIME_Relative backoff;
70
71   /**
72    * Task ID for reconnect.
73    */
74   struct GNUNET_SCHEDULER_Task *reconnect_task;
75 };
76
77
78 /**
79  * Abstract representation of a plugin's session.
80  * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
81  */
82 struct GNUNET_TRANSPORT_PluginSession
83 {
84   /**
85    * Unique session identifier.
86    */
87   uint64_t session_id;
88
89   /**
90    * Location for the client to store "data".
91    */
92   void *client_ctx;
93 };
94
95
96 /**
97  * Task run to re-establish the connection.
98  *
99  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
100  */
101 static void
102 do_plugin_connect (void *cls);
103
104
105 /**
106  * Free the session entry and notify the callback about its demise.
107  *
108  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
109  * @param key key of the session in the map
110  * @param value the session to free
111  * @return #GNUNET_OK (continue to iterate)
112  */
113 static int
114 free_entry (void *cls,
115             uint32_t key,
116             void *value)
117 {
118   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
119   struct GNUNET_TRANSPORT_PluginSession *ps = value;
120
121   pm->cb (pm->cb_cls,
122           ps,
123           &ps->client_ctx,
124           NULL);
125   GNUNET_break (GNUNET_YES ==
126                 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
127                                                         key,
128                                                         ps));
129   GNUNET_break (NULL == ps->client_ctx);
130   GNUNET_free (ps);
131   return GNUNET_OK;
132 }
133
134
135 /**
136  * Cut the existing connection and reconnect.
137  *
138  * @param pm our context
139  */
140 static void
141 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
142 {
143   GNUNET_MQ_destroy (pm->mq);
144   pm->mq = NULL;
145   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
146                                            &free_entry,
147                                            pm);
148   pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
149   pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
150                                                      &do_plugin_connect,
151                                                      pm);
152 }
153
154
155 /**
156  * Convert 64-bit session ID to 32-bit index for hash map.
157  *
158  * @param id 64-bit session ID
159  * @return 32-bit hash map index
160  */
161 static uint32_t
162 wrap_id (uint64_t id)
163 {
164   return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
165 }
166
167
168 /**
169  * Context for #locate_by_id().
170  */
171 struct SearchContext
172 {
173   /**
174    * Result.
175    */
176   struct GNUNET_TRANSPORT_PluginSession *ps;
177
178   /**
179    * ID to locate.
180    */
181   uint64_t session_id;
182 };
183
184
185 /**
186  * Locate a session entry.
187  *
188  * @param cls our `struct SearchContext`
189  * @param key key of the session in the map
190  * @param value a session
191  * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
192  */
193 static int
194 locate_by_id (void *cls,
195               uint32_t key,
196               void *value)
197 {
198   struct SearchContext *sc = cls;
199   struct GNUNET_TRANSPORT_PluginSession *ps = value;
200
201   if (sc->session_id == ps->session_id)
202   {
203     sc->ps = ps;
204     return GNUNET_SYSERR;
205   }
206   return GNUNET_OK;
207 }
208
209
210 /**
211  * Function called with responses from the service.
212  *
213  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
214  * @paramm tpmm message with event data
215  * @return #GNUNET_Ok if message is well-formed
216  */
217 static int
218 check_event (void *cls,
219              const struct TransportPluginMonitorMessage *tpmm)
220 {
221   const char *pname;
222   size_t pname_len;
223   size_t paddr_len;
224
225   pname = (const char *) &tpmm[1];
226   pname_len = ntohs (tpmm->plugin_name_len);
227   paddr_len = ntohs (tpmm->plugin_address_len);
228   if ((pname_len
229        + paddr_len
230        + sizeof(struct TransportPluginMonitorMessage) != ntohs (
231          tpmm->header.size)) ||
232       ((0 != pname_len) &&
233        ('\0' != pname[pname_len - 1])))
234   {
235     GNUNET_break (0);
236     return GNUNET_SYSERR;
237   }
238   return GNUNET_OK;
239 }
240
241
242 /**
243  * Function called with responses from the service.
244  *
245  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
246  * @paramm tpmm message with event data
247  */
248 static void
249 handle_event (void *cls,
250               const struct TransportPluginMonitorMessage *tpmm)
251 {
252   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
253   struct GNUNET_TRANSPORT_PluginSession *ps;
254   const char *pname;
255   const void *paddr;
256   enum GNUNET_TRANSPORT_SessionState ss;
257   size_t pname_len;
258   size_t paddr_len;
259   struct GNUNET_TRANSPORT_SessionInfo info;
260   struct GNUNET_HELLO_Address addr;
261   struct SearchContext rv;
262
263   pname = (const char *) &tpmm[1];
264   pname_len = ntohs (tpmm->plugin_name_len);
265   paddr_len = ntohs (tpmm->plugin_address_len);
266   paddr = &pname[pname_len];
267   ps = NULL;
268   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
269   if (GNUNET_TRANSPORT_SS_INIT == ss)
270   {
271     ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
272     ps->session_id = tpmm->session_id;
273     (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
274                                                 wrap_id (tpmm->session_id),
275                                                 ps,
276                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
277   }
278   else
279   {
280     rv.session_id = tpmm->session_id;
281     rv.ps = NULL;
282     (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
283                                                          wrap_id (
284                                                            tpmm->session_id),
285                                                          &locate_by_id,
286                                                          &rv);
287     ps = rv.ps;
288     if (NULL == ps)
289     {
290       GNUNET_break (0);
291       reconnect_plugin_ctx (pm);
292       return;
293     }
294   }
295   info.state = ss;
296   info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
297   info.num_msg_pending = ntohl (tpmm->msgs_pending);
298   info.num_bytes_pending = ntohl (tpmm->bytes_pending);
299   info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
300   info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
301   info.address = &addr;
302   addr.peer = tpmm->peer;
303   addr.address = (0 == paddr_len) ? NULL : paddr;
304   addr.address_length = paddr_len;
305   addr.transport_name = (0 == pname_len) ? NULL : pname;
306   addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
307   pm->cb (pm->cb_cls,
308           ps,
309           &ps->client_ctx,
310           &info);
311
312   if (GNUNET_TRANSPORT_SS_DONE == ss)
313   {
314     GNUNET_break (NULL == ps->client_ctx);
315     GNUNET_assert (GNUNET_YES ==
316                    GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
317                                                            wrap_id (
318                                                              tpmm->session_id),
319                                                            ps));
320     GNUNET_free (ps);
321   }
322 }
323
324
325 /**
326  * Function called with sync responses from the service.
327  *
328  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
329  * @param msg message from the service
330  */
331 static void
332 handle_sync (void *cls,
333              const struct GNUNET_MessageHeader *msg)
334 {
335   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
336
337   /* we are in sync, notify callback */
338   pm->cb (pm->cb_cls,
339           NULL,
340           NULL,
341           NULL);
342 }
343
344
345 /**
346  * Generic error handler, called with the appropriate
347  * error code and the same closure specified at the creation of
348  * the message queue.
349  * Not every message queue implementation supports an error handler.
350  *
351  * @param cls closure with the `struct GNUNET_NSE_Handle *`
352  * @param error error code
353  */
354 static void
355 mq_error_handler (void *cls,
356                   enum GNUNET_MQ_Error error)
357 {
358   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
359
360   reconnect_plugin_ctx (pm);
361 }
362
363
364 /**
365  * Task run to re-establish the connection.
366  *
367  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
368  */
369 static void
370 do_plugin_connect (void *cls)
371 {
372   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
373   struct GNUNET_MQ_MessageHandler handlers[] = {
374     GNUNET_MQ_hd_var_size (event,
375                            GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
376                            struct TransportPluginMonitorMessage,
377                            pm),
378     GNUNET_MQ_hd_fixed_size (sync,
379                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
380                              struct GNUNET_MessageHeader,
381                              pm),
382     GNUNET_MQ_handler_end ()
383   };
384   struct GNUNET_MessageHeader *msg;
385   struct GNUNET_MQ_Envelope *env;
386
387   pm->reconnect_task = NULL;
388   pm->mq = GNUNET_CLIENT_connect (pm->cfg,
389                                   "transport",
390                                   handlers,
391                                   &mq_error_handler,
392                                   pm);
393   if (NULL == pm->mq)
394     return;
395   env = GNUNET_MQ_msg (msg,
396                        GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
397   GNUNET_MQ_send (pm->mq,
398                   env);
399 }
400
401
402 /**
403  * Install a plugin session state monitor callback.  The callback
404  * will be notified whenever the session changes.
405  *
406  * @param cfg configuration to use
407  * @param cb callback to invoke on events
408  * @param cb_cls closure for @a cb
409  * @return NULL on error, otherwise handle for cancellation
410  */
411 struct GNUNET_TRANSPORT_PluginMonitor *
412 GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
413                                   GNUNET_TRANSPORT_SessionMonitorCallback cb,
414                                   void *cb_cls)
415 {
416   struct GNUNET_TRANSPORT_PluginMonitor *pm;
417
418   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
419   pm->cb = cb;
420   pm->cb_cls = cb_cls;
421   pm->cfg = cfg;
422   do_plugin_connect (pm);
423   if (NULL == pm->mq)
424   {
425     GNUNET_free (pm);
426     return NULL;
427   }
428   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
429   return pm;
430 }
431
432
433 /**
434  * Cancel monitoring the plugin session state.  The callback will
435  * be called once for each session that is up with the information
436  * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
437  * this is just to enable client-side cleanup).
438  *
439  * @param pm handle of the request that is to be cancelled
440  */
441 void
442 GNUNET_TRANSPORT_monitor_plugins_cancel (struct
443                                          GNUNET_TRANSPORT_PluginMonitor *pm)
444 {
445   if (NULL != pm->mq)
446   {
447     GNUNET_MQ_destroy (pm->mq);
448     pm->mq = NULL;
449   }
450   if (NULL != pm->reconnect_task)
451   {
452     GNUNET_SCHEDULER_cancel (pm->reconnect_task);
453     pm->reconnect_task = NULL;
454   }
455   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
456                                            &free_entry,
457                                            pm);
458   GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
459   GNUNET_free (pm);
460 }
461
462
463 /* end of transport_api_monitor_plugins.c */