plugin datastore mysql
[oweals/gnunet.git] / src / transport / transport_api_monitor_plugins.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2014 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file transport/transport_api_monitor_plugins.c
23  * @brief montoring api for transport plugin session status
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_hello_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_transport_service.h"
32 #include "transport.h"
33
34
35 /**
36  * Handle for a plugin session state monitor.
37  */
38 struct GNUNET_TRANSPORT_PluginMonitor
39 {
40
41   /**
42    * Connection to the service.
43    */
44   struct GNUNET_CLIENT_Connection *client;
45
46   /**
47    * Our configuration.
48    */
49   const struct GNUNET_CONFIGURATION_Handle *cfg;
50
51   /**
52    * Callback to call.
53    */
54   GNUNET_TRANSPORT_SessionMonitorCallback cb;
55
56   /**
57    * Closure for @e cb
58    */
59   void *cb_cls;
60
61   /**
62    * Map of session_ids (reduced to 32-bits) to
63    * `struct GNUNET_TRANSPORT_PluginSession` objects.
64    */
65   struct GNUNET_CONTAINER_MultiHashMap32 *sessions;
66
67   /**
68    * Backoff for reconnect.
69    */
70   struct GNUNET_TIME_Relative backoff;
71
72   /**
73    * Task ID for reconnect.
74    */
75   struct GNUNET_SCHEDULER_Task * reconnect_task;
76
77 };
78
79
80 /**
81  * Abstract representation of a plugin's session.
82  * Corresponds to the `struct GNUNET_ATS_Session` within the TRANSPORT service.
83  */
84 struct GNUNET_TRANSPORT_PluginSession
85 {
86   /**
87    * Unique session identifier.
88    */
89   uint64_t session_id;
90
91   /**
92    * Location for the client to store "data".
93    */
94   void *client_ctx;
95 };
96
97
98 /**
99  * Function called with responses from the service.
100  *
101  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
102  * @param msg NULL on timeout or error, otherwise presumably a
103  *        message with the human-readable address
104  */
105 static void
106 response_processor (void *cls,
107                     const struct GNUNET_MessageHeader *msg);
108
109
110 /**
111  * Send our subscription request to the service.
112  *
113  * @param pal_ctx our context
114  */
115 static void
116 send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
117 {
118   struct GNUNET_MessageHeader msg;
119
120   msg.size = htons (sizeof (struct GNUNET_MessageHeader));
121   msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
122   GNUNET_assert (GNUNET_OK ==
123                  GNUNET_CLIENT_transmit_and_get_response (pm->client,
124                                                           &msg,
125                                                           GNUNET_TIME_UNIT_FOREVER_REL,
126                                                           GNUNET_YES,
127                                                           &response_processor,
128                                                           pm));
129 }
130
131
132 /**
133  * Task run to re-establish the connection.
134  *
135  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
136  */
137 static void
138 do_plugin_connect (void *cls)
139 {
140   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
141
142   pm->reconnect_task = NULL;
143   pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
144   GNUNET_assert (NULL != pm->client);
145   send_plugin_mon_request (pm);
146 }
147
148
149 /**
150  * Free the session entry and notify the callback about its demise.
151  *
152  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor`
153  * @param key key of the session in the map
154  * @param value the session to free
155  * @return #GNUNET_OK (continue to iterate)
156  */
157 static int
158 free_entry (void *cls,
159             uint32_t key,
160             void *value)
161 {
162   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
163   struct GNUNET_TRANSPORT_PluginSession *ps = value;
164
165   pm->cb (pm->cb_cls,
166           ps,
167           &ps->client_ctx,
168           NULL);
169   GNUNET_break (GNUNET_YES ==
170                 GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
171                                                         key,
172                                                         ps));
173   GNUNET_break (NULL == ps->client_ctx);
174   GNUNET_free (ps);
175   return GNUNET_OK;
176 }
177
178
179 /**
180  * Cut the existing connection and reconnect.
181  *
182  * @param pm our context
183  */
184 static void
185 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
186 {
187   GNUNET_CLIENT_disconnect (pm->client);
188   pm->client = NULL;
189   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
190                                            &free_entry,
191                                            pm);
192   pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff);
193   pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff,
194                                                      &do_plugin_connect,
195                                                      pm);
196 }
197
198
199 /**
200  * Convert 64-bit session ID to 32-bit index for hash map.
201  *
202  * @param id 64-bit session ID
203  * @return 32-bit hash map index
204  */
205 static uint32_t
206 wrap_id (uint64_t id)
207 {
208   return ((uint32_t) id) ^ ((uint32_t) (id >> 32));
209 }
210
211
212 /**
213  * Context for #locate_by_id().
214  */
215 struct SearchContext
216 {
217
218   /**
219    * Result.
220    */
221   struct GNUNET_TRANSPORT_PluginSession *ps;
222
223   /**
224    * ID to locate.
225    */
226   uint64_t session_id;
227
228 };
229
230
231 /**
232  * Locate a session entry.
233  *
234  * @param cls our `struct SearchContext`
235  * @param key key of the session in the map
236  * @param value a session
237  * @return #GNUNET_OK (continue to iterate), or #GNUNET_SYSERR (match found)
238  */
239 static int
240 locate_by_id (void *cls,
241               uint32_t key,
242               void *value)
243 {
244   struct SearchContext *sc = cls;
245   struct GNUNET_TRANSPORT_PluginSession *ps = value;
246
247   if (sc->session_id == ps->session_id)
248   {
249     sc->ps = ps;
250     return GNUNET_SYSERR;
251   }
252   return GNUNET_OK;
253 }
254
255
256 /**
257  * Function called with responses from the service.
258  *
259  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
260  * @param msg NULL on timeout or error, otherwise presumably a
261  *        message with the human-readable address
262  */
263 static void
264 response_processor (void *cls,
265                     const struct GNUNET_MessageHeader *msg)
266 {
267   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
268   const struct TransportPluginMonitorMessage *tpmm;
269   struct GNUNET_TRANSPORT_PluginSession *ps;
270   const char *pname;
271   const void *paddr;
272   enum GNUNET_TRANSPORT_SessionState ss;
273   size_t pname_len;
274   size_t paddr_len;
275   struct GNUNET_TRANSPORT_SessionInfo info;
276   struct GNUNET_HELLO_Address addr;
277   struct SearchContext rv;
278
279   if (NULL == msg)
280   {
281     reconnect_plugin_ctx (pm);
282     return;
283   }
284   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs (msg->type)) &&
285        (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
286   {
287     /* we are in sync */
288     pm->cb (pm->cb_cls,
289             NULL,
290             NULL,
291             NULL);
292     GNUNET_CLIENT_receive (pm->client,
293                            &response_processor,
294                            pm,
295                            GNUNET_TIME_UNIT_FOREVER_REL);
296     return;
297   }
298
299   if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs (msg->type)) ||
300        (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
301   {
302     GNUNET_break (0);
303     reconnect_plugin_ctx (pm);
304     return;
305   }
306   tpmm = (const struct TransportPluginMonitorMessage *) msg;
307   pname = (const char *) &tpmm[1];
308   pname_len = ntohs (tpmm->plugin_name_len);
309   paddr_len = ntohs (tpmm->plugin_address_len);
310   if ( (pname_len +
311         paddr_len +
312         sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
313        ( (0 != pname_len) &&
314          ('\0' != pname[pname_len - 1]) ) )
315   {
316     GNUNET_break (0);
317     reconnect_plugin_ctx (pm);
318     return;
319   }
320   paddr = &pname[pname_len];
321   ps = NULL;
322   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
323   if (GNUNET_TRANSPORT_SS_INIT == ss)
324   {
325     ps = GNUNET_new (struct GNUNET_TRANSPORT_PluginSession);
326     ps->session_id = tpmm->session_id;
327     (void) GNUNET_CONTAINER_multihashmap32_put (pm->sessions,
328                                                 wrap_id (tpmm->session_id),
329                                                 ps,
330                                                 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
331
332   }
333   else
334   {
335     rv.session_id = tpmm->session_id;
336     rv.ps = NULL;
337     (void) GNUNET_CONTAINER_multihashmap32_get_multiple (pm->sessions,
338                                                          wrap_id (tpmm->session_id),
339                                                          &locate_by_id,
340                                                          &rv);
341     ps = rv.ps;
342     if (NULL == ps)
343     {
344       GNUNET_break (0);
345       reconnect_plugin_ctx (pm);
346       return;
347     }
348   }
349   info.state = ss;
350   info.is_inbound = (int16_t) ntohs (tpmm->is_inbound);
351   info.num_msg_pending = ntohl (tpmm->msgs_pending);
352   info.num_bytes_pending = ntohl (tpmm->bytes_pending);
353   info.receive_delay = GNUNET_TIME_absolute_ntoh (tpmm->delay);
354   info.session_timeout = GNUNET_TIME_absolute_ntoh (tpmm->timeout);
355   info.address = &addr;
356   addr.peer = tpmm->peer;
357   addr.address = (0 == paddr_len) ? NULL : paddr;
358   addr.address_length = paddr_len;
359   addr.transport_name = (0 == pname_len) ? NULL : pname;
360   addr.local_info = GNUNET_HELLO_ADDRESS_INFO_NONE;
361   pm->cb (pm->cb_cls,
362           ps,
363           &ps->client_ctx,
364           &info);
365
366   if (GNUNET_TRANSPORT_SS_DONE == ss)
367   {
368     GNUNET_break (NULL == ps->client_ctx);
369     GNUNET_assert (GNUNET_YES ==
370                    GNUNET_CONTAINER_multihashmap32_remove (pm->sessions,
371                                                            wrap_id (tpmm->session_id),
372                                                            ps));
373     GNUNET_free (ps);
374   }
375   GNUNET_CLIENT_receive (pm->client,
376                          &response_processor,
377                          pm,
378                          GNUNET_TIME_UNIT_FOREVER_REL);
379 }
380
381
382 /**
383  * Install a plugin session state monitor callback.  The callback
384  * will be notified whenever the session changes.
385  *
386  * @param cfg configuration to use
387  * @param cb callback to invoke on events
388  * @param cb_cls closure for @a cb
389  * @return NULL on error, otherwise handle for cancellation
390  */
391 struct GNUNET_TRANSPORT_PluginMonitor *
392 GNUNET_TRANSPORT_monitor_plugins (const struct GNUNET_CONFIGURATION_Handle *cfg,
393                                   GNUNET_TRANSPORT_SessionMonitorCallback cb,
394                                   void *cb_cls)
395 {
396   struct GNUNET_TRANSPORT_PluginMonitor *pm;
397   struct GNUNET_CLIENT_Connection *client;
398
399   client = GNUNET_CLIENT_connect ("transport",
400                                   cfg);
401   if (NULL == client)
402     return NULL;
403   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
404   pm->cb = cb;
405   pm->cb_cls = cb_cls;
406   pm->cfg = cfg;
407   pm->client = client;
408   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
409   send_plugin_mon_request (pm);
410   return pm;
411 }
412
413
414 /**
415  * Cancel monitoring the plugin session state.  The callback will
416  * be called once for each session that is up with the information
417  * #GNUNET_TRANSPORT_SS_FINI (even though the session may stay up;
418  * this is just to enable client-side cleanup).
419  *
420  * @param pm handle of the request that is to be cancelled
421  */
422 void
423 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor *pm)
424 {
425   if (NULL != pm->client)
426   {
427     GNUNET_CLIENT_disconnect (pm->client);
428     pm->client = NULL;
429   }
430   if (NULL != pm->reconnect_task)
431   {
432     GNUNET_SCHEDULER_cancel (pm->reconnect_task);
433     pm->reconnect_task = NULL;
434   }
435   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
436                                            &free_entry,
437                                            pm);
438   GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions);
439   GNUNET_free (pm);
440 }
441
442
443 /* end of transport_api_monitor_plugins.c */