Various changes:
[oweals/gnunet.git] / src / transport / transport_api_monitor_plugins.c
1 /*
2      This file is part of GNUnet.
3      (C) 2014 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api_monitor_plugins.c
23  * @brief montoring api for transport plugin session status
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_hello_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_transport_service.h"
32 #include "transport.h"
33
34
35 /**
36  * Handle for a plugin session state monitor.
37  */
38 struct GNUNET_TRANSPORT_PluginMonitor
39 {
40
41   /**
42    * Connection to the service.
43    */
44   struct GNUNET_CLIENT_Connection *client;
45
46   /**
47    * Our configuration.
48    */
49   const struct GNUNET_CONFIGURATION_Handle *cfg;
50
51   /**
52    * Callback to call.
53    */
54   GNUNET_TRANSPORT_SessionMonitorCallback cb;
55
56   /**
57    * Closure for @e cb
58    */
59   void *cb_cls;
60
61   /**
62    * Map of session_ids (reduced to 32-bits) to
63    * `struct GNUNET_TRANSPORT_PluginSession` objects.
64    */
65   struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
66
67   /**
68    * Backoff for reconnect.
69    */
70   struct GNUNET_TIME_Relative backoff;
71
72   /**
73    * Task ID for reconnect.
74    */
75   struct GNUNET_SCHEDULER_Task * reconnect_task;
76
77 };
78
79
80 /**
81  * Abstract representation of a plugin's session.
82  * Corresponds to the `struct Session` within the TRANSPORT service.
83  */
84 struct GNUNET_TRANSPORT_PluginSession
85 {
86   /**
87    * Unique session identifier.
88    */
89   uint64_t session_id;
90
91   /**
92    * Location for the client to store "data".
93    */
94   void *client_ctx;
95 };
96
97
98 /**
99  * Function called with responses from the service.
100  *
101  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
102  * @param msg NULL on timeout or error, otherwise presumably a
103  *        message with the human-readable address
104  */
105 static void
106 response_processor (void *cls,
107                     const struct GNUNET_MessageHeader *msg);
108
109
110 /**
111  * Send our subscription request to the service.
112  *
113  * @param pal_ctx our context
114  */
115 static void
116 send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
117 {
118   struct GNUNET_MessageHeader msg;
119
120   msg.size = htons (sizeof (struct GNUNET_MessageHeader));
121   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
122   GNUNET_assert (GNUNET_OK ==
123                  GNUNET_CLIENT_transmit_and_get_response (pm->client,
124                                                           &msg,
125                                                           GNUNET_TIME_UNIT_FOREVER_REL,
126                                                           GNUNET_YES,
127                                                           &response_processor,
128                                                           pm));
129 }
130
131
132 /**
133  * Task run to re-establish the connection.
134  *
135  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
136  * @param tc scheduler context, unused
137  */
138 static void
139 do_plugin_connect (void *cls,
140                  const struct GNUNET_SCHEDULER_TaskContext *tc)
141 {
142   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
143
144   pm->reconnect_task = NULL;
145   pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
146   GNUNET_assert (NULL != pm->client);
147   send_plugin_mon_request (pm);
148 }
149
150
151 /**
152  * Free the session entry and notify the callback about its demise.
153  *
154  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
155  * @param key key of the session in the map
156  * @param value the session to free
157  * @return #GNUNET_OK (continue to iterate)
158  */
159 static int
160 free_entry (void *cls,
161             uint32_t key,
162             void *value)
163 {
164   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
165   struct GNUNET_TRANSPORT_PluginSession *ps = value;
166
167   pm->cb (pm->cb_cls,
168           ps,
169           &ps->client_ctx,
170           NULL);
171   GNUNET_break (GNUNET_YES ==
172                 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
173                                                         key,
174                                                         ps));
175   GNUNET_break (NULL == ps->client_ctx);
176   GNUNET_free (ps);
177   return GNUNET_OK;
178 }
179
180
181 /**
182  * We got disconnected, remove all existing entries from
183  * the map and notify client.
184  *
185  * @param pm montitor that got disconnected
186  */
187 static void
188 clear_map (struct GNUNET_TRANSPORT_PluginMonitor *pm)
189 {
190   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
191                                            &free_entry,
192                                            pm);
193 }
194
195
196 /**
197  * Cut the existing connection and reconnect.
198  *
199  * @param pm our context
200  */
201 static void
202 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
203 {
204   GNUNET_CLIENT_disconnect (pm->client);
205   pm->client = NULL;
206   clear_map (pm);
207   pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
208   pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
209                                                      &do_plugin_connect,
210                                                      pm);
211 }
212
213
214 /**
215  * Convert 64-bit session ID to 32-bit index for hash map.
216  *
217  * @param id 64-bit session ID
218  * @return 32-bit hash map index
219  */
220 static uint32_t
221 wrap_id (uint64_t id)
222 {
223   return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
224 }
225
226
227 /**
228  * Context for #locate_by_id().
229  */
230 struct SearchContext
231 {
232
233   /**
234    * Result.
235    */
236   struct GNUNET_TRANSPORT_PluginSession *ps;
237
238   /**
239    * ID to locate.
240    */
241   uint64_t session_id;
242
243 };
244
245
246 /**
247  * Locate a session entry.
248  *
249  * @param cls our `struct SearchContext`
250  * @param key key of the session in the map
251  * @param value a session
252  * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
253  */
254 static int
255 locate_by_id (void *cls,
256               uint32_t key,
257               void *value)
258 {
259   struct SearchContext *sc = cls;
260   struct GNUNET_TRANSPORT_PluginSession *ps = value;
261
262   if (sc->session_id == ps->session_id)
263   {
264     sc->ps = ps;
265     return GNUNET_SYSERR;
266   }
267   return GNUNET_OK;
268 }
269
270
271 /**
272  * Function called with responses from the service.
273  *
274  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
275  * @param msg NULL on timeout or error, otherwise presumably a
276  *        message with the human-readable address
277  */
278 static void
279 response_processor (void *cls,
280                     const struct GNUNET_MessageHeader *msg)
281 {
282   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
283   const struct TransportPluginMonitorMessage *tpmm;
284   struct GNUNET_TRANSPORT_PluginSession *ps;
285   const char *pname;
286   const void *paddr;
287   enum GNUNET_TRANSPORT_SessionState ss;
288   size_t pname_len;
289   size_t paddr_len;
290   struct GNUNET_TRANSPORT_SessionInfo info;
291   struct GNUNET_HELLO_Address addr;
292   struct SearchContext rv;
293
294   if (NULL == msg)
295   {
296     reconnect_plugin_ctx (pm);
297     return;
298   }
299   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) &&
300        (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
301   {
302     /* we are in sync */
303     pm->cb (pm->cb_cls,
304             NULL,
305             NULL,
306             NULL);
307     GNUNET_CLIENT_receive (pm->client,
308                            &response_processor,
309                            pm,
310                            GNUNET_TIME_UNIT_FOREVER_REL);
311     return;
312   }
313
314   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) ||
315        (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
316   {
317     GNUNET_break (0);
318     reconnect_plugin_ctx (pm);
319     return;
320   }
321   tpmm = (const struct TransportPluginMonitorMessage *) msg;
322   pname = (const char *) &tpmm[1];
323   pname_len = ntohs (tpmm->plugin_name_len);
324   paddr_len = ntohs (tpmm->plugin_address_len);
325   if ( (pname_len +
326         paddr_len +
327         sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
328        ( (0 != pname_len) &&
329          ('\0' != pname[pname_len - 1]) ) )
330   {
331     GNUNET_break (0);
332     reconnect_plugin_ctx (pm);
333     return;
334   }
335   paddr = &pname[pname_len];
336   ps = NULL;
337   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
338   if (GNUNET_TRANSPORT_SS_INIT == ss)
339   {
340     ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
341     ps->session_id = tpmm->session_id;
342     (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
343                                                 wrap_id (tpmm->session_id),
344                                                 ps,
345                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
346
347   }
348   else
349   {
350     rv.session_id = tpmm->session_id;
351     rv.ps = NULL;
352     (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
353                                                          wrap_id (tpmm->session_id),
354                                                          &locate_by_id,
355                                                          &rv);
356     ps = rv.ps;
357     if (NULL == ps)
358     {
359       GNUNET_break (0);
360       reconnect_plugin_ctx (pm);
361       return;
362     }
363   }
364   info.state = ss;
365   info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
366   info.num_msg_pending = ntohl (tpmm->msgs_pending);
367   info.num_bytes_pending = ntohl (tpmm->bytes_pending);
368   info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
369   info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
370   info.address = &addr;
371   addr.peer = tpmm->peer;
372   addr.address = (0 == paddr_len) ? NULL : paddr;
373   addr.address_length = paddr_len;
374   addr.transport_name = (0 == pname_len) ? NULL : pname;
375   addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
376   pm->cb (pm->cb_cls,
377           ps,
378           &ps->client_ctx,
379           &info);
380
381   if (GNUNET_TRANSPORT_SS_DONE == ss)
382   {
383     GNUNET_break (NULL == ps->client_ctx);
384     GNUNET_assert (GNUNET_YES ==
385                    GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
386                                                            wrap_id (tpmm->session_id),
387                                                            ps));
388     GNUNET_free (ps);
389   }
390   GNUNET_CLIENT_receive (pm->client,
391                          &response_processor,
392                          pm,
393                          GNUNET_TIME_UNIT_FOREVER_REL);
394 }
395
396
397 /**
398  * Install a plugin session state monitor callback.  The callback
399  * will be notified whenever the session changes.
400  *
401  * @param cfg configuration to use
402  * @param cb callback to invoke on events
403  * @param cb_cls closure for @a cb
404  * @return NULL on error, otherwise handle for cancellation
405  */
406 struct GNUNET_TRANSPORT_PluginMonitor *
407 GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
408                                   GNUNET_TRANSPORT_SessionMonitorCallback cb,
409                                   void *cb_cls)
410 {
411   struct GNUNET_TRANSPORT_PluginMonitor *pm;
412   struct GNUNET_CLIENT_Connection *client;
413
414   client = GNUNET_CLIENT_connect ("transport",
415                                   cfg);
416   if (NULL == client)
417     return NULL;
418   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
419   pm->cb = cb;
420   pm->cb_cls = cb_cls;
421   pm->cfg = cfg;
422   pm->client = client;
423   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
424   send_plugin_mon_request (pm);
425   return pm;
426 }
427
428
429 /**
430  * Cancel monitoring the plugin session state.  The callback will
431  * be called once for each session that is up with the information
432  * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
433  * this is just to enable client-side cleanup).
434  *
435  * @param pm handle of the request that is to be cancelled
436  */
437 void
438 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
439 {
440   if (NULL != pm->client)
441   {
442     GNUNET_CLIENT_disconnect (pm->client);
443     pm->client = NULL;
444   }
445   if (NULL != pm->reconnect_task)
446   {
447     GNUNET_SCHEDULER_cancel (pm->reconnect_task);
448     pm->reconnect_task = NULL;
449   }
450   clear_map (pm);
451   GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
452   GNUNET_free (pm);
453 }
454
455
456 /* end of transport_api_monitor_plugins.c */