glitch in the license text detected by hyazinthe, thank you!
[oweals/gnunet.git] / src / transport / transport_api_monitor_plugins.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file transport/transport_api_monitor_plugins.c
18  * @brief montoring api for transport plugin session status
19  * @author Christian Grothoff
20  */
21 #include "platform.h"
22 #include "gnunet_util_lib.h"
23 #include "gnunet_arm_service.h"
24 #include "gnunet_hello_lib.h"
25 #include "gnunet_protocols.h"
26 #include "gnunet_transport_service.h"
27 #include "transport.h"
28
29
30 /**
31  * Handle for a plugin session state monitor.
32  */
33 struct GNUNET_TRANSPORT_PluginMonitor
34 {
35
36   /**
37    * Connection to the service.
38    */
39   struct GNUNET_MQ_Handle *mq;
40
41   /**
42    * Our configuration.
43    */
44   const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46   /**
47    * Callback to call.
48    */
49   GNUNET_TRANSPORT_SessionMonitorCallback cb;
50
51   /**
52    * Closure for @e cb
53    */
54   void *cb_cls;
55
56   /**
57    * Map of session_ids (reduced to 32-bits) to
58    * `struct GNUNET_TRANSPORT_PluginSession` objects.
59    */
60   struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
61
62   /**
63    * Backoff for reconnect.
64    */
65   struct GNUNET_TIME_Relative backoff;
66
67   /**
68    * Task ID for reconnect.
69    */
70   struct GNUNET_SCHEDULER_Task *reconnect_task;
71
72 };
73
74
75 /**
76  * Abstract representation of a plugin's session.
77  * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
78  */
79 struct GNUNET_TRANSPORT_PluginSession
80 {
81   /**
82    * Unique session identifier.
83    */
84   uint64_t session_id;
85
86   /**
87    * Location for the client to store "data".
88    */
89   void *client_ctx;
90 };
91
92
93
94 /**
95  * Task run to re-establish the connection.
96  *
97  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
98  */
99 static void
100 do_plugin_connect (void *cls);
101
102
103 /**
104  * Free the session entry and notify the callback about its demise.
105  *
106  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
107  * @param key key of the session in the map
108  * @param value the session to free
109  * @return #GNUNET_OK (continue to iterate)
110  */
111 static int
112 free_entry (void *cls,
113             uint32_t key,
114             void *value)
115 {
116   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
117   struct GNUNET_TRANSPORT_PluginSession *ps = value;
118
119   pm->cb (pm->cb_cls,
120           ps,
121           &ps->client_ctx,
122           NULL);
123   GNUNET_break (GNUNET_YES ==
124                 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
125                                                         key,
126                                                         ps));
127   GNUNET_break (NULL == ps->client_ctx);
128   GNUNET_free (ps);
129   return GNUNET_OK;
130 }
131
132
133 /**
134  * Cut the existing connection and reconnect.
135  *
136  * @param pm our context
137  */
138 static void
139 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
140 {
141   GNUNET_MQ_destroy (pm->mq);
142   pm->mq = NULL;
143   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
144                                            &free_entry,
145                                            pm);
146   pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
147   pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
148                                                      &do_plugin_connect,
149                                                      pm);
150 }
151
152
153 /**
154  * Convert 64-bit session ID to 32-bit index for hash map.
155  *
156  * @param id 64-bit session ID
157  * @return 32-bit hash map index
158  */
159 static uint32_t
160 wrap_id (uint64_t id)
161 {
162   return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
163 }
164
165
166 /**
167  * Context for #locate_by_id().
168  */
169 struct SearchContext
170 {
171
172   /**
173    * Result.
174    */
175   struct GNUNET_TRANSPORT_PluginSession *ps;
176
177   /**
178    * ID to locate.
179    */
180   uint64_t session_id;
181
182 };
183
184
185 /**
186  * Locate a session entry.
187  *
188  * @param cls our `struct SearchContext`
189  * @param key key of the session in the map
190  * @param value a session
191  * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
192  */
193 static int
194 locate_by_id (void *cls,
195               uint32_t key,
196               void *value)
197 {
198   struct SearchContext *sc = cls;
199   struct GNUNET_TRANSPORT_PluginSession *ps = value;
200
201   if (sc->session_id == ps->session_id)
202   {
203     sc->ps = ps;
204     return GNUNET_SYSERR;
205   }
206   return GNUNET_OK;
207 }
208
209
210 /**
211  * Function called with responses from the service.
212  *
213  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
214  * @paramm tpmm message with event data
215  * @return #GNUNET_Ok if message is well-formed
216  */
217 static int
218 check_event (void *cls,
219              const struct TransportPluginMonitorMessage *tpmm)
220 {
221   const char *pname;
222   size_t pname_len;
223   size_t paddr_len;
224
225   pname = (const char *) &tpmm[1];
226   pname_len = ntohs (tpmm->plugin_name_len);
227   paddr_len = ntohs (tpmm->plugin_address_len);
228   if ( (pname_len +
229         paddr_len +
230         sizeof (struct TransportPluginMonitorMessage) != ntohs (tpmm->header.size)) ||
231        ( (0 != pname_len) &&
232          ('\0' != pname[pname_len - 1]) ) )
233   {
234     GNUNET_break (0);
235     return GNUNET_SYSERR;
236   }
237   return GNUNET_OK;
238 }
239
240
241 /**
242  * Function called with responses from the service.
243  *
244  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
245  * @paramm tpmm message with event data
246  */
247 static void
248 handle_event (void *cls,
249               const struct TransportPluginMonitorMessage *tpmm)
250 {
251   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
252   struct GNUNET_TRANSPORT_PluginSession *ps;
253   const char *pname;
254   const void *paddr;
255   enum GNUNET_TRANSPORT_SessionState ss;
256   size_t pname_len;
257   size_t paddr_len;
258   struct GNUNET_TRANSPORT_SessionInfo info;
259   struct GNUNET_HELLO_Address addr;
260   struct SearchContext rv;
261
262   pname = (const char *) &tpmm[1];
263   pname_len = ntohs (tpmm->plugin_name_len);
264   paddr_len = ntohs (tpmm->plugin_address_len);
265   paddr = &pname[pname_len];
266   ps = NULL;
267   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
268   if (GNUNET_TRANSPORT_SS_INIT == ss)
269   {
270     ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
271     ps->session_id = tpmm->session_id;
272     (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
273                                                 wrap_id (tpmm->session_id),
274                                                 ps,
275                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
276
277   }
278   else
279   {
280     rv.session_id = tpmm->session_id;
281     rv.ps = NULL;
282     (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
283                                                          wrap_id (tpmm->session_id),
284                                                          &locate_by_id,
285                                                          &rv);
286     ps = rv.ps;
287     if (NULL == ps)
288     {
289       GNUNET_break (0);
290       reconnect_plugin_ctx (pm);
291       return;
292     }
293   }
294   info.state = ss;
295   info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
296   info.num_msg_pending = ntohl (tpmm->msgs_pending);
297   info.num_bytes_pending = ntohl (tpmm->bytes_pending);
298   info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
299   info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
300   info.address = &addr;
301   addr.peer = tpmm->peer;
302   addr.address = (0 == paddr_len) ? NULL : paddr;
303   addr.address_length = paddr_len;
304   addr.transport_name = (0 == pname_len) ? NULL : pname;
305   addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
306   pm->cb (pm->cb_cls,
307           ps,
308           &ps->client_ctx,
309           &info);
310
311   if (GNUNET_TRANSPORT_SS_DONE == ss)
312   {
313     GNUNET_break (NULL == ps->client_ctx);
314     GNUNET_assert (GNUNET_YES ==
315                    GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
316                                                            wrap_id (tpmm->session_id),
317                                                            ps));
318     GNUNET_free (ps);
319   }
320 }
321
322
323 /**
324  * Function called with sync responses from the service.
325  *
326  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
327  * @param msg message from the service
328  */
329 static void
330 handle_sync (void *cls,
331              const struct GNUNET_MessageHeader *msg)
332 {
333   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
334
335   /* we are in sync, notify callback */
336   pm->cb (pm->cb_cls,
337           NULL,
338           NULL,
339           NULL);
340 }
341
342
343 /**
344  * Generic error handler, called with the appropriate
345  * error code and the same closure specified at the creation of
346  * the message queue.
347  * Not every message queue implementation supports an error handler.
348  *
349  * @param cls closure with the `struct GNUNET_NSE_Handle *`
350  * @param error error code
351  */
352 static void
353 mq_error_handler (void *cls,
354                   enum GNUNET_MQ_Error error)
355 {
356   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
357
358   reconnect_plugin_ctx (pm);
359 }
360
361
362 /**
363  * Task run to re-establish the connection.
364  *
365  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
366  */
367 static void
368 do_plugin_connect (void *cls)
369 {
370   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
371   struct GNUNET_MQ_MessageHandler handlers[] = {
372     GNUNET_MQ_hd_var_size (event,
373                            GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
374                            struct TransportPluginMonitorMessage,
375                            pm),
376     GNUNET_MQ_hd_fixed_size (sync,
377                              GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
378                              struct GNUNET_MessageHeader,
379                              pm),
380     GNUNET_MQ_handler_end ()
381   };
382   struct GNUNET_MessageHeader *msg;
383   struct GNUNET_MQ_Envelope *env;
384
385   pm->reconnect_task = NULL;
386   pm->mq = GNUNET_CLIENT_connect (pm->cfg,
387                                   "transport",
388                                   handlers,
389                                   &mq_error_handler,
390                                   pm);
391   if (NULL == pm->mq)
392     return;
393   env = GNUNET_MQ_msg (msg,
394                        GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
395   GNUNET_MQ_send (pm->mq,
396                   env);
397 }
398
399
400 /**
401  * Install a plugin session state monitor callback.  The callback
402  * will be notified whenever the session changes.
403  *
404  * @param cfg configuration to use
405  * @param cb callback to invoke on events
406  * @param cb_cls closure for @a cb
407  * @return NULL on error, otherwise handle for cancellation
408  */
409 struct GNUNET_TRANSPORT_PluginMonitor *
410 GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
411                                   GNUNET_TRANSPORT_SessionMonitorCallback cb,
412                                   void *cb_cls)
413 {
414   struct GNUNET_TRANSPORT_PluginMonitor *pm;
415
416   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
417   pm->cb = cb;
418   pm->cb_cls = cb_cls;
419   pm->cfg = cfg;
420   do_plugin_connect (pm);
421   if (NULL == pm->mq)
422   {
423     GNUNET_free (pm);
424     return NULL;
425   }
426   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
427   return pm;
428 }
429
430
431 /**
432  * Cancel monitoring the plugin session state.  The callback will
433  * be called once for each session that is up with the information
434  * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
435  * this is just to enable client-side cleanup).
436  *
437  * @param pm handle of the request that is to be cancelled
438  */
439 void
440 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
441 {
442   if (NULL != pm->mq)
443   {
444     GNUNET_MQ_destroy (pm->mq);
445     pm->mq = NULL;
446   }
447   if (NULL != pm->reconnect_task)
448   {
449     GNUNET_SCHEDULER_cancel (pm->reconnect_task);
450     pm->reconnect_task = NULL;
451   }
452   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
453                                            &free_entry,
454                                            pm);
455   GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
456   GNUNET_free (pm);
457 }
458
459
460 /* end of transport_api_monitor_plugins.c */