sensor: towards reporting to collection point
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor-reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor-reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_cadet_service.h"
31 #include "gnunet_applications.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
34
35 /**
36  * Context of reporting operations
37  */
38 struct ReportingContext
39 {
40
41   /**
42    * DLL
43    */
44   struct ReportingContext *prev;
45
46   /**
47    * DLL
48    */
49   struct ReportingContext *next;
50
51   /**
52    * Sensor information
53    */
54   struct SensorInfo *sensor;
55
56   /**
57    * Collection point reporting task
58    * (OR GNUNET_SCHEDULER_NO_TASK)
59    */
60   GNUNET_SCHEDULER_TaskIdentifier cp_task;
61
62   /**
63    * Watcher of sensor values
64    */
65   struct GNUNET_PEERSTORE_WatchContext *wc;
66
67   /**
68    * Last value read from sensor
69    */
70   void *last_value;
71
72   /**
73    * Size of @last_value
74    */
75   size_t last_value_size;
76
77   /**
78    * Incremented with every lock request
79    * (e.g. to send last value).
80    * Change @last_value only when @value_lock = 0
81    */
82   int value_lock;
83
84 };
85
86 /**
87  * Context of a created CADET channel
88  */
89 struct CadetChannelContext
90 {
91
92   /**
93    * DLL
94    */
95   struct CadetChannelContext *prev;
96
97   /**
98    * DLL
99    */
100   struct CadetChannelContext *next;
101
102   /**
103    * Peer Id of
104    */
105   struct GNUNET_PeerIdentity pid;
106
107   /**
108    * CADET channel handle
109    */
110   struct GNUNET_CADET_Channel *c;
111
112   /**
113    * Are we sending data on this channel?
114    * #GNUNET_YES / #GNUNET_NO
115    */
116   int sending;
117
118 };
119
120 /**
121  * Our configuration.
122  */
123 static const struct GNUNET_CONFIGURATION_Handle *cfg;
124
125 /**
126  * Handle to peerstore service
127  */
128 static struct GNUNET_PEERSTORE_Handle *peerstore;
129
130 /**
131  * My peer id
132  */
133 static struct GNUNET_PeerIdentity mypeerid;
134
135 /**
136  * Handle to CADET service
137  */
138 static struct GNUNET_CADET_Handle *cadet;
139
140 /**
141  * Head of DLL of all reporting contexts
142  */
143 struct ReportingContext *rc_head;
144
145 /**
146  * Tail of DLL of all reporting contexts
147  */
148 struct ReportingContext *rc_tail;
149
150 /**
151  * Head of DLL of all cadet channels
152  */
153 struct CadetChannelContext *cc_head;
154
155 /**
156  * Tail of DLL of all cadet channels
157  */
158 struct CadetChannelContext *cc_tail;
159
160
161 /**
162  * Destroy a reporting context structure
163  */
164 static void
165 destroy_reporting_context (struct ReportingContext *rc)
166 {
167   if (NULL != rc->wc)
168   {
169     GNUNET_PEERSTORE_watch_cancel (rc->wc);
170     rc->wc = NULL;
171   }
172   if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
173   {
174     GNUNET_SCHEDULER_cancel(rc->cp_task);
175     rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
176   }
177   if (NULL != rc->last_value)
178   {
179     GNUNET_free (rc->last_value);
180     rc->last_value_size = 0;
181   }
182   GNUNET_free(rc);
183 }
184
185 /**
186  * Stop sensor reporting module
187  */
188 void SENSOR_reporting_stop ()
189 {
190   struct ReportingContext *rc;
191
192   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
193   /* TODO: destroy cadet channels */
194   while (NULL != rc_head)
195   {
196     rc = rc_head;
197     GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
198     destroy_reporting_context (rc);
199   }
200   if (NULL != peerstore)
201   {
202     GNUNET_PEERSTORE_disconnect (peerstore);
203     peerstore = NULL;
204   }
205   if (NULL != cadet)
206   {
207     GNUNET_CADET_disconnect (cadet);
208     cadet = NULL;
209   }
210 }
211
212 /**
213  * Returns CADET channel established to given peer
214  * or creates a new one
215  *
216  * @param pid Peer Identity
217  * @return Context of established cadet channel
218  */
219 struct CadetChannelContext *
220 get_cadet_channel (struct GNUNET_PeerIdentity pid)
221 {
222   struct CadetChannelContext *cc;
223
224   cc = cc_head;
225   while (NULL != cc)
226   {
227     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
228       return cc;
229     cc = cc->next;
230   }
231   cc = GNUNET_new (struct CadetChannelContext);
232   cc->c = GNUNET_CADET_channel_create(cadet,
233       cc,
234       &pid,
235       GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
236       GNUNET_CADET_OPTION_DEFAULT);
237   cc->pid = pid;
238   cc->sending = GNUNET_NO;
239   GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
240   return cc;
241 }
242
243 /**
244  * Function called to notify a client about the connection begin ready
245  * to queue more data.  @a buf will be NULL and @a size zero if the
246  * connection was closed for writing in the meantime.
247  *
248  * @param cls closure
249  * @param size number of bytes available in @a buf
250  * @param buf where the callee should write the message
251  * @return number of bytes written to @a buf
252  */
253 size_t
254 do_report_collection_point (void *cls, size_t size, void *buf)
255 {
256   /* TODO: check error from CADET */
257   /* TODO: do transfer */
258   /* TODO: cc->sending, rc->value_lock */
259   return 0;
260 }
261
262 /**
263  * Task scheduled to send values to collection point
264  *
265  * @param cls closure, a 'struct CollectionReportingContext *'
266  * @param tc unused
267  */
268 void report_collection_point
269 (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
270 {
271   struct ReportingContext *rc = cls;
272   struct SensorInfo *sensor = rc->sensor;
273   struct CadetChannelContext *cc;
274
275   rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
276   GNUNET_assert (NULL != sensor->collection_point);
277   cc = get_cadet_channel (*sensor->collection_point);
278   if (GNUNET_YES == cc->sending)
279   {
280     LOG (GNUNET_ERROR_TYPE_DEBUG,
281         "Cadet channel to collection point busy, trying again on next interval.");
282     rc->cp_task =
283         GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
284             &report_collection_point,
285             rc);
286     return;
287   }
288   cc->sending = GNUNET_YES;
289   rc->value_lock ++;
290   /* TODO: construct message */
291   /* TODO: if constructed message is added to cc, no need for rc->value_lock */
292   GNUNET_CADET_notify_transmit_ready (cc->c,
293       GNUNET_YES,
294       sensor->collection_interval,
295       rc->last_value_size, /* FIXME: size of constructed message */
296       &do_report_collection_point,
297       rc);
298   /* TODO */
299   /* TODO: reschedule reporting */
300 }
301
302 /*
303  * Sensor value watch callback
304  */
305 static int
306 sensor_watch_cb (void *cls,
307     struct GNUNET_PEERSTORE_Record *record,
308     char *emsg)
309 {
310   struct ReportingContext *rc = cls;
311
312   if (NULL != emsg)
313     return GNUNET_YES;
314   if (rc->value_lock > 0)
315   {
316     LOG (GNUNET_ERROR_TYPE_DEBUG,
317         "Did not update reporting context of sensor `%s'"
318         " because value is locked for sending.",
319         rc->sensor->name);
320     return GNUNET_YES;
321   }
322   if (NULL != rc->last_value)
323   {
324     GNUNET_free (rc->last_value);
325     rc->last_value_size = 0;
326   }
327   rc->last_value = GNUNET_malloc(record->value_size);
328   memcpy (rc->last_value, record->value, record->value_size);
329   rc->last_value_size = record->value_size;
330   return GNUNET_YES;
331 }
332
333 /**
334  * Iterator for defined sensors
335  * Watches sensors for readings to report
336  *
337  * @param cls unused
338  * @param key unused
339  * @param value a 'struct SensorInfo *' with sensor information
340  * @return #GNUNET_YES to continue iterations
341  */
342 static int
343 init_sensor_reporting (void *cls,
344     const struct GNUNET_HashCode *key,
345     void *value)
346 {
347   struct SensorInfo *sensor = value;
348   struct ReportingContext *rc;
349
350   if (NULL == sensor->collection_point &&
351       GNUNET_NO == sensor->p2p_report)
352     return GNUNET_YES;
353   rc = GNUNET_new (struct ReportingContext);
354   rc->sensor = sensor;
355   rc->last_value = NULL;
356   rc->last_value_size = 0;
357   rc->value_lock = 0;
358   rc->wc = GNUNET_PEERSTORE_watch(peerstore,
359       "sensor",
360       &mypeerid,
361       sensor->name,
362       &sensor_watch_cb,
363       rc);
364   if (NULL != sensor->collection_point)
365   {
366     LOG (GNUNET_ERROR_TYPE_INFO,
367         "Will start reporting sensor `%s' values to collection point `%s' every %s.\n",
368         sensor->name, GNUNET_i2s_full(sensor->collection_point),
369         GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval, GNUNET_YES));
370     rc->cp_task =
371         GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
372             &report_collection_point,
373             rc);
374   }
375   if (GNUNET_YES == sensor->p2p_report)
376   {
377     LOG (GNUNET_ERROR_TYPE_INFO,
378         "Will start reporting sensor `%s' values to p2p network every %s.\n",
379         sensor->name,
380         GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval, GNUNET_YES));
381   }
382   GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
383   return GNUNET_YES;
384 }
385
386 /**
387  * Function called whenever a channel is destroyed.  Should clean up
388  * any associated state.
389  *
390  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
391  *
392  * @param cls closure (set from #GNUNET_CADET_connect)
393  * @param channel connection to the other end (henceforth invalid)
394  * @param channel_ctx place where local state associated
395  *                   with the channel is stored
396  */
397 static void cadet_channel_destroyed (void *cls,
398     const struct GNUNET_CADET_Channel *channel,
399     void *channel_ctx)
400 {
401   struct CadetChannelContext *cc = channel_ctx;
402
403   GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
404   GNUNET_free (cc);
405 }
406
407 /**
408  * Start the sensor reporting module
409  *
410  * @param c our service configuration
411  * @param sensors multihashmap of loaded sensors
412  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
413  */
414 int
415 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
416     struct GNUNET_CONTAINER_MultiHashMap *sensors)
417 {
418   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
419       {NULL, 0, 0}
420   };
421
422   GNUNET_assert(NULL != sensors);
423   cfg = c;
424   GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
425   GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
426   peerstore = GNUNET_PEERSTORE_connect(cfg);
427   if (NULL == peerstore)
428   {
429     LOG (GNUNET_ERROR_TYPE_ERROR,
430         _("Failed to connect to peerstore service.\n"));
431     SENSOR_reporting_stop ();
432     return GNUNET_SYSERR;
433   }
434   cadet = GNUNET_CADET_connect(cfg,
435       NULL,
436       NULL,
437       &cadet_channel_destroyed,
438       cadet_handlers,
439       NULL);
440   if (NULL == cadet)
441   {
442     LOG (GNUNET_ERROR_TYPE_ERROR,
443         _("Failed to connect to CADET service.\n"));
444     SENSOR_reporting_stop ();
445     return GNUNET_SYSERR;
446   }
447
448   return GNUNET_OK;
449 }
450
451 /* end of gnunet-service-sensor-reporting.c */