tolerate additional IPv4 address now available for gnunet.org
[oweals/gnunet.git] / src / transport / transport_api_monitor_plugins.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file transport/transport_api_monitor_plugins.c
23  * @brief montoring api for transport plugin session status
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_hello_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_transport_service.h"
32 #include "transport.h"
33
34
35 /**
36  * Handle for a plugin session state monitor.
37  */
38 struct GNUNET_TRANSPORT_PluginMonitor
39 {
40
41   /**
42    * Connection to the service.
43    */
44   struct GNUNET_MQ_Handle *mq;
45
46   /**
47    * Our configuration.
48    */
49   const struct GNUNET_CONFIGURATION_Handle *cfg;
50
51   /**
52    * Callback to call.
53    */
54   GNUNET_TRANSPORT_SessionMonitorCallback cb;
55
56   /**
57    * Closure for @e cb
58    */
59   void *cb_cls;
60
61   /**
62    * Map of session_ids (reduced to 32-bits) to
63    * `struct GNUNET_TRANSPORT_PluginSession` objects.
64    */
65   struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
66
67   /**
68    * Backoff for reconnect.
69    */
70   struct GNUNET_TIME_Relative backoff;
71
72   /**
73    * Task ID for reconnect.
74    */
75   struct GNUNET_SCHEDULER_Task *reconnect_task;
76
77 };
78
79
80 /**
81  * Abstract representation of a plugin's session.
82  * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
83  */
84 struct GNUNET_TRANSPORT_PluginSession
85 {
86   /**
87    * Unique session identifier.
88    */
89   uint64_t session_id;
90
91   /**
92    * Location for the client to store "data".
93    */
94   void *client_ctx;
95 };
96
97
98
99 /**
100  * Task run to re-establish the connection.
101  *
102  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
103  */
104 static void
105 do_plugin_connect (void *cls);
106
107
108 /**
109  * Free the session entry and notify the callback about its demise.
110  *
111  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
112  * @param key key of the session in the map
113  * @param value the session to free
114  * @return #GNUNET_OK (continue to iterate)
115  */
116 static int
117 free_entry (void *cls,
118             uint32_t key,
119             void *value)
120 {
121   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
122   struct GNUNET_TRANSPORT_PluginSession *ps = value;
123
124   pm->cb (pm->cb_cls,
125           ps,
126           &ps->client_ctx,
127           NULL);
128   GNUNET_break (GNUNET_YES ==
129                 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
130                                                         key,
131                                                         ps));
132   GNUNET_break (NULL == ps->client_ctx);
133   GNUNET_free (ps);
134   return GNUNET_OK;
135 }
136
137
138 /**
139  * Cut the existing connection and reconnect.
140  *
141  * @param pm our context
142  */
143 static void
144 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
145 {
146   GNUNET_MQ_destroy (pm->mq);
147   pm->mq = NULL;
148   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
149                                            &free_entry,
150                                            pm);
151   pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
152   pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
153                                                      &do_plugin_connect,
154                                                      pm);
155 }
156
157
158 /**
159  * Convert 64-bit session ID to 32-bit index for hash map.
160  *
161  * @param id 64-bit session ID
162  * @return 32-bit hash map index
163  */
164 static uint32_t
165 wrap_id (uint64_t id)
166 {
167   return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
168 }
169
170
171 /**
172  * Context for #locate_by_id().
173  */
174 struct SearchContext
175 {
176
177   /**
178    * Result.
179    */
180   struct GNUNET_TRANSPORT_PluginSession *ps;
181
182   /**
183    * ID to locate.
184    */
185   uint64_t session_id;
186
187 };
188
189
190 /**
191  * Locate a session entry.
192  *
193  * @param cls our `struct SearchContext`
194  * @param key key of the session in the map
195  * @param value a session
196  * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
197  */
198 static int
199 locate_by_id (void *cls,
200               uint32_t key,
201               void *value)
202 {
203   struct SearchContext *sc = cls;
204   struct GNUNET_TRANSPORT_PluginSession *ps = value;
205
206   if (sc->session_id == ps->session_id)
207   {
208     sc->ps = ps;
209     return GNUNET_SYSERR;
210   }
211   return GNUNET_OK;
212 }
213
214
215 /**
216  * Function called with responses from the service.
217  *
218  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
219  * @paramm tpmm message with event data
220  * @return #GNUNET_Ok if message is well-formed
221  */
222 static int
223 check_event (void *cls,
224              const struct TransportPluginMonitorMessage *tpmm)
225 {
226   const char *pname;
227   size_t pname_len;
228   size_t paddr_len;
229
230   pname = (const char *) &tpmm[1];
231   pname_len = ntohs (tpmm->plugin_name_len);
232   paddr_len = ntohs (tpmm->plugin_address_len);
233   if ( (pname_len +
234         paddr_len +
235         sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
236        ( (0 != pname_len) &&
237          ('\0' != pname[pname_len - 1]) ) )
238   {
239     GNUNET_break (0);
240     return GNUNET_SYSERR;
241   }
242   return GNUNET_OK;
243 }
244
245
246 /**
247  * Function called with responses from the service.
248  *
249  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
250  * @paramm tpmm message with event data
251  */
252 static void
253 handle_event (void *cls,
254               const struct TransportPluginMonitorMessage *tpmm)
255 {
256   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
257   struct GNUNET_TRANSPORT_PluginSession *ps;
258   const char *pname;
259   const void *paddr;
260   enum GNUNET_TRANSPORT_SessionState ss;
261   size_t pname_len;
262   size_t paddr_len;
263   struct GNUNET_TRANSPORT_SessionInfo info;
264   struct GNUNET_HELLO_Address addr;
265   struct SearchContext rv;
266
267   pname = (const char *) &tpmm[1];
268   pname_len = ntohs (tpmm->plugin_name_len);
269   paddr_len = ntohs (tpmm->plugin_address_len);
270   paddr = &pname[pname_len];
271   ps = NULL;
272   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
273   if (GNUNET_TRANSPORT_SS_INIT == ss)
274   {
275     ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
276     ps->session_id = tpmm->session_id;
277     (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
278                                                 wrap_id (tpmm->session_id),
279                                                 ps,
280                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
281
282   }
283   else
284   {
285     rv.session_id = tpmm->session_id;
286     rv.ps = NULL;
287     (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
288                                                          wrap_id (tpmm->session_id),
289                                                          &locate_by_id,
290                                                          &rv);
291     ps = rv.ps;
292     if (NULL == ps)
293     {
294       GNUNET_break (0);
295       reconnect_plugin_ctx (pm);
296       return;
297     }
298   }
299   info.state = ss;
300   info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
301   info.num_msg_pending = ntohl (tpmm->msgs_pending);
302   info.num_bytes_pending = ntohl (tpmm->bytes_pending);
303   info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
304   info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
305   info.address = &addr;
306   addr.peer = tpmm->peer;
307   addr.address = (0 == paddr_len) ? NULL : paddr;
308   addr.address_length = paddr_len;
309   addr.transport_name = (0 == pname_len) ? NULL : pname;
310   addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
311   pm->cb (pm->cb_cls,
312           ps,
313           &ps->client_ctx,
314           &info);
315
316   if (GNUNET_TRANSPORT_SS_DONE == ss)
317   {
318     GNUNET_break (NULL == ps->client_ctx);
319     GNUNET_assert (GNUNET_YES ==
320                    GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
321                                                            wrap_id (tpmm->session_id),
322                                                            ps));
323     GNUNET_free (ps);
324   }
325 }
326
327
328 /**
329  * Function called with sync responses from the service.
330  *
331  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
332  * @param msg message from the service
333  */
334 static void
335 handle_sync (void *cls,
336              const struct GNUNET_MessageHeader *msg)
337 {
338   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
339
340   /* we are in sync, notify callback */
341   pm->cb (pm->cb_cls,
342           NULL,
343           NULL,
344           NULL);
345 }
346
347
348 /**
349  * Generic error handler, called with the appropriate
350  * error code and the same closure specified at the creation of
351  * the message queue.
352  * Not every message queue implementation supports an error handler.
353  *
354  * @param cls closure with the `struct GNUNET_NSE_Handle *`
355  * @param error error code
356  */
357 static void
358 mq_error_handler (void *cls,
359                   enum GNUNET_MQ_Error error)
360 {
361   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
362
363   reconnect_plugin_ctx (pm);
364 }
365
366
367 /**
368  * Task run to re-establish the connection.
369  *
370  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
371  */
372 static void
373 do_plugin_connect (void *cls)
374 {
375   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
376   struct GNUNET_MQ_MessageHandler handlers[] = {
377     GNUNET_MQ_hd_var_size (event,
378                            GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
379                            struct TransportPluginMonitorMessage,
380                            pm),
381     GNUNET_MQ_hd_fixed_size (sync,
382                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
383                              struct GNUNET_MessageHeader,
384                              pm),
385     GNUNET_MQ_handler_end ()
386   };
387   struct GNUNET_MessageHeader *msg;
388   struct GNUNET_MQ_Envelope *env;
389
390   pm->reconnect_task = NULL;
391   pm->mq = GNUNET_CLIENT_connect (pm->cfg,
392                                   "transport",
393                                   handlers,
394                                   &mq_error_handler,
395                                   pm);
396   if (NULL == pm->mq)
397     return;
398   env = GNUNET_MQ_msg (msg,
399                        GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
400   GNUNET_MQ_send (pm->mq,
401                   env);
402 }
403
404
405 /**
406  * Install a plugin session state monitor callback.  The callback
407  * will be notified whenever the session changes.
408  *
409  * @param cfg configuration to use
410  * @param cb callback to invoke on events
411  * @param cb_cls closure for @a cb
412  * @return NULL on error, otherwise handle for cancellation
413  */
414 struct GNUNET_TRANSPORT_PluginMonitor *
415 GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
416                                   GNUNET_TRANSPORT_SessionMonitorCallback cb,
417                                   void *cb_cls)
418 {
419   struct GNUNET_TRANSPORT_PluginMonitor *pm;
420
421   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
422   pm->cb = cb;
423   pm->cb_cls = cb_cls;
424   pm->cfg = cfg;
425   do_plugin_connect (pm);
426   if (NULL == pm->mq)
427   {
428     GNUNET_free (pm);
429     return NULL;
430   }
431   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
432   return pm;
433 }
434
435
436 /**
437  * Cancel monitoring the plugin session state.  The callback will
438  * be called once for each session that is up with the information
439  * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
440  * this is just to enable client-side cleanup).
441  *
442  * @param pm handle of the request that is to be cancelled
443  */
444 void
445 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
446 {
447   if (NULL != pm->mq)
448   {
449     GNUNET_MQ_destroy (pm->mq);
450     pm->mq = NULL;
451   }
452   if (NULL != pm->reconnect_task)
453   {
454     GNUNET_SCHEDULER_cancel (pm->reconnect_task);
455     pm->reconnect_task = NULL;
456   }
457   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
458                                            &free_entry,
459                                            pm);
460   GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
461   GNUNET_free (pm);
462 }
463
464
465 /* end of transport_api_monitor_plugins.c */