moved common sensor functionality to a util lib
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor-reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor-reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_cadet_service.h"
31 #include "gnunet_applications.h"
32
33 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
34
35 /**
36  * Context of reporting operations
37  */
38 struct ReportingContext
39 {
40
41   /**
42    * DLL
43    */
44   struct ReportingContext *prev;
45
46   /**
47    * DLL
48    */
49   struct ReportingContext *next;
50
51   /**
52    * Sensor information
53    */
54   struct SensorInfo *sensor;
55
56   /**
57    * Collection point reporting task
58    * (OR GNUNET_SCHEDULER_NO_TASK)
59    */
60   GNUNET_SCHEDULER_TaskIdentifier cp_task;
61
62   /**
63    * Watcher of sensor values
64    */
65   struct GNUNET_PEERSTORE_WatchContext *wc;
66
67   /**
68    * Last value read from sensor
69    */
70   void *last_value;
71
72   /**
73    * Size of @last_value
74    */
75   size_t last_value_size;
76
77   /**
78    * Timestamp of last value reading
79    */
80   uint64_t timestamp;
81
82 };
83
84 /**
85  * Context of a created CADET channel
86  */
87 struct CadetChannelContext
88 {
89
90   /**
91    * DLL
92    */
93   struct CadetChannelContext *prev;
94
95   /**
96    * DLL
97    */
98   struct CadetChannelContext *next;
99
100   /**
101    * Peer Id of
102    */
103   struct GNUNET_PeerIdentity pid;
104
105   /**
106    * CADET channel handle
107    */
108   struct GNUNET_CADET_Channel *c;
109
110   /**
111    * Are we sending data on this channel?
112    * #GNUNET_YES / #GNUNET_NO
113    */
114   int sending;
115
116   /**
117    * Pointer to a pending message to be sent over the channel
118    */
119   void *pending_msg;
120
121   /**
122    * Size of @pending_msg
123    */
124   size_t pending_msg_size;
125
126   /**
127    * Handle to CADET tranmission request in case we are sending
128    * (sending == GNUNET_YES)
129    */
130   struct GNUNET_CADET_TransmitHandle *th;
131
132 };
133
134 /**
135  * Our configuration.
136  */
137 static const struct GNUNET_CONFIGURATION_Handle *cfg;
138
139 /**
140  * Handle to peerstore service
141  */
142 static struct GNUNET_PEERSTORE_Handle *peerstore;
143
144 /**
145  * My peer id
146  */
147 static struct GNUNET_PeerIdentity mypeerid;
148
149 /**
150  * Handle to CADET service
151  */
152 static struct GNUNET_CADET_Handle *cadet;
153
154 /**
155  * Head of DLL of all reporting contexts
156  */
157 struct ReportingContext *rc_head;
158
159 /**
160  * Tail of DLL of all reporting contexts
161  */
162 struct ReportingContext *rc_tail;
163
164 /**
165  * Head of DLL of all cadet channels
166  */
167 struct CadetChannelContext *cc_head;
168
169 /**
170  * Tail of DLL of all cadet channels
171  */
172 struct CadetChannelContext *cc_tail;
173
174
175 /**
176  * Destroy a reporting context structure
177  */
178 static void
179 destroy_reporting_context (struct ReportingContext *rc)
180 {
181   if (NULL != rc->wc)
182   {
183     GNUNET_PEERSTORE_watch_cancel (rc->wc);
184     rc->wc = NULL;
185   }
186   if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
187   {
188     GNUNET_SCHEDULER_cancel(rc->cp_task);
189     rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
190   }
191   if (NULL != rc->last_value)
192   {
193     GNUNET_free (rc->last_value);
194     rc->last_value_size = 0;
195   }
196   GNUNET_free(rc);
197 }
198
199 /**
200  * Destroy a CADET channel context struct
201  */
202 static void
203 destroy_cadet_channel_context (struct CadetChannelContext *cc)
204 {
205   if (NULL != cc->th)
206   {
207     GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
208     cc->th = NULL;
209   }
210   if (NULL != cc->pending_msg)
211   {
212     GNUNET_free (cc->pending_msg);
213     cc->pending_msg = NULL;
214   }
215   if (NULL != cc->c)
216   {
217     GNUNET_CADET_channel_destroy (cc->c);
218     cc->c = NULL;
219   }
220   GNUNET_free (cc);
221 }
222
223 /**
224  * Stop sensor reporting module
225  */
226 void SENSOR_reporting_stop ()
227 {
228   struct ReportingContext *rc;
229   struct CadetChannelContext *cc;
230
231   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
232   while (NULL != cc_head)
233   {
234     cc = cc_head;
235     GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
236     destroy_cadet_channel_context (cc);
237   }
238   while (NULL != rc_head)
239   {
240     rc = rc_head;
241     GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
242     destroy_reporting_context (rc);
243   }
244   if (NULL != peerstore)
245   {
246     GNUNET_PEERSTORE_disconnect (peerstore);
247     peerstore = NULL;
248   }
249   if (NULL != cadet)
250   {
251     GNUNET_CADET_disconnect (cadet);
252     cadet = NULL;
253   }
254 }
255
256 /**
257  * Returns CADET channel established to given peer
258  * or creates a new one
259  *
260  * @param pid Peer Identity
261  * @return Context of established cadet channel
262  */
263 static struct CadetChannelContext *
264 get_cadet_channel (struct GNUNET_PeerIdentity pid)
265 {
266   struct CadetChannelContext *cc;
267
268   cc = cc_head;
269   while (NULL != cc)
270   {
271     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
272       return cc;
273     cc = cc->next;
274   }
275   cc = GNUNET_new (struct CadetChannelContext);
276   cc->c = GNUNET_CADET_channel_create(cadet,
277       cc,
278       &pid,
279       GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
280       GNUNET_CADET_OPTION_DEFAULT);
281   cc->pid = pid;
282   cc->sending = GNUNET_NO;
283   GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
284   return cc;
285 }
286
287 /**
288  * Construct a reading message ready to be sent over CADET channel
289  *
290  * @param rc reporting context to read data from
291  * @param msg used to return the created message structure
292  * @return size of created message
293  */
294 static size_t
295 construct_reading_message (struct ReportingContext *rc,
296     struct GNUNET_SENSOR_Reading **msg)
297 {
298   struct GNUNET_SENSOR_Reading *ret;
299   size_t sensorname_size;
300   size_t total_size;
301   void *dummy;
302
303   sensorname_size = strlen (rc->sensor->name) + 1;
304   total_size = sizeof(struct GNUNET_SENSOR_Reading) +
305       sensorname_size +
306       rc->last_value_size;
307   ret = GNUNET_malloc (total_size);
308   ret->header->size = htons (total_size);
309   ret->header->type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
310   ret->sensorname_size = GNUNET_htobe64 (sensorname_size);
311   ret->sensorversion_major = htons (rc->sensor->version_major);
312   ret->sensorversion_minor = htons (rc->sensor->version_minor);
313   ret->timestamp = GNUNET_htobe64 (rc->timestamp);
314   ret->value_size = GNUNET_htobe64 (rc->last_value_size);
315   dummy = &ret[1];
316   memcpy (dummy, rc->sensor->name, sensorname_size);
317   dummy += sensorname_size;
318   memcpy (dummy, rc->last_value, rc->last_value_size);
319   *msg = ret;
320   return total_size;
321 }
322
323 /**
324  * Function called to notify a client about the connection begin ready
325  * to queue more data.  @a buf will be NULL and @a size zero if the
326  * connection was closed for writing in the meantime.
327  *
328  * @param cls closure
329  * @param size number of bytes available in @a buf
330  * @param buf where the callee should write the message
331  * @return number of bytes written to @a buf
332  */
333 static size_t
334 do_report_collection_point (void *cls, size_t size, void *buf)
335 {
336   struct CadetChannelContext *cc = cls;
337   size_t written = 0;
338
339   LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
340   cc->sending = GNUNET_NO;
341   if (NULL == buf || size != cc->pending_msg_size)
342   {
343     LOG (GNUNET_ERROR_TYPE_WARNING,
344         "CADET failed to transmit message to collection point, discarding.");
345   }
346   else
347   {
348     memcpy (buf, cc->pending_msg, cc->pending_msg_size);
349     written = cc->pending_msg_size;
350   }
351   GNUNET_free (cc->pending_msg);
352   cc->pending_msg_size = 0;
353   return written;
354 }
355
356 /**
357  * Task scheduled to send values to collection point
358  *
359  * @param cls closure, a 'struct CollectionReportingContext *'
360  * @param tc unused
361  */
362 static void report_collection_point
363 (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
364 {
365   struct ReportingContext *rc = cls;
366   struct SensorInfo *sensor = rc->sensor;
367   struct CadetChannelContext *cc;
368   struct GNUNET_SENSOR_Reading *msg;
369   size_t msg_size;
370
371   rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
372   if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
373   {
374     LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' "
375         "to report yet.\n", rc->sensor->name);
376     rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
377             &report_collection_point, rc);
378     return;
379   }
380   LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' "
381       "to collection point.\n", rc->sensor->name);
382   GNUNET_assert (NULL != sensor->collection_point);
383   cc = get_cadet_channel (*sensor->collection_point);
384   if (GNUNET_YES == cc->sending)
385   {
386     LOG (GNUNET_ERROR_TYPE_DEBUG,
387         "Cadet channel to collection point busy, "
388         "trying again for sensor `%s' on next interval.\n", rc->sensor->name);
389     rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
390             &report_collection_point, rc);
391     return;
392   }
393   msg_size = construct_reading_message (rc, &msg);
394   cc->sending = GNUNET_YES;
395   cc->pending_msg = msg;
396   cc->pending_msg_size = msg_size;
397   cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
398       GNUNET_YES,
399       sensor->collection_interval,
400       msg_size,
401       &do_report_collection_point,
402       cc);
403   rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
404       &report_collection_point, rc);
405 }
406
407 /*
408  * Sensor value watch callback
409  */
410 static int
411 sensor_watch_cb (void *cls,
412     struct GNUNET_PEERSTORE_Record *record,
413     char *emsg)
414 {
415   struct ReportingContext *rc = cls;
416
417   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value, "
418       "updating notification last_value.\n", rc->sensor->name);
419   if (NULL != emsg)
420     return GNUNET_YES;
421   if (NULL != rc->last_value)
422   {
423     GNUNET_free (rc->last_value);
424     rc->last_value_size = 0;
425   }
426   rc->last_value = GNUNET_malloc(record->value_size);
427   memcpy (rc->last_value, record->value, record->value_size);
428   rc->last_value_size = record->value_size;
429   rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
430   return GNUNET_YES;
431 }
432
433 /**
434  * Iterator for defined sensors
435  * Watches sensors for readings to report
436  *
437  * @param cls unused
438  * @param key unused
439  * @param value a 'struct SensorInfo *' with sensor information
440  * @return #GNUNET_YES to continue iterations
441  */
442 static int
443 init_sensor_reporting (void *cls,
444     const struct GNUNET_HashCode *key,
445     void *value)
446 {
447   struct SensorInfo *sensor = value;
448   struct ReportingContext *rc;
449
450   if (NULL == sensor->collection_point &&
451       GNUNET_NO == sensor->p2p_report)
452     return GNUNET_YES;
453   rc = GNUNET_new (struct ReportingContext);
454   rc->sensor = sensor;
455   rc->last_value = NULL;
456   rc->last_value_size = 0;
457   rc->wc = GNUNET_PEERSTORE_watch(peerstore,
458       "sensor",
459       &mypeerid,
460       sensor->name,
461       &sensor_watch_cb,
462       rc);
463   if (NULL != sensor->collection_point)
464   {
465     LOG (GNUNET_ERROR_TYPE_INFO,
466         "Will start reporting sensor `%s' values to "
467         "collection point `%s' every %s.\n",
468         sensor->name, GNUNET_i2s_full(sensor->collection_point),
469         GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
470             GNUNET_YES));
471     rc->cp_task =
472         GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
473             &report_collection_point,
474             rc);
475   }
476   if (GNUNET_YES == sensor->p2p_report)
477   {
478     LOG (GNUNET_ERROR_TYPE_INFO,
479         "Will start reporting sensor `%s' values to p2p network every %s.\n",
480         sensor->name,
481         GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
482             GNUNET_YES));
483   }
484   GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
485   return GNUNET_YES;
486 }
487
488 /**
489  * Function called whenever a channel is destroyed.  Should clean up
490  * any associated state.
491  *
492  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
493  *
494  * @param cls closure (set from #GNUNET_CADET_connect)
495  * @param channel connection to the other end (henceforth invalid)
496  * @param channel_ctx place where local state associated
497  *                   with the channel is stored
498  */
499 static void cadet_channel_destroyed (void *cls,
500     const struct GNUNET_CADET_Channel *channel,
501     void *channel_ctx)
502 {
503   struct CadetChannelContext *cc = channel_ctx;
504
505   LOG (GNUNET_ERROR_TYPE_DEBUG,
506       "Received a `channel destroyed' notification from CADET, "
507       "cleaning up.\n");
508   GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
509   cc->c = NULL;
510   destroy_cadet_channel_context (cc);
511 }
512
513 /**
514  * Start the sensor reporting module
515  *
516  * @param c our service configuration
517  * @param sensors multihashmap of loaded sensors
518  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
519  */
520 int
521 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
522     struct GNUNET_CONTAINER_MultiHashMap *sensors)
523 {
524   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
525       {NULL, 0, 0}
526   };
527
528   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
529   GNUNET_assert(NULL != sensors);
530   cfg = c;
531   peerstore = GNUNET_PEERSTORE_connect(cfg);
532   if (NULL == peerstore)
533   {
534     LOG (GNUNET_ERROR_TYPE_ERROR,
535         _("Failed to connect to peerstore service.\n"));
536     SENSOR_reporting_stop ();
537     return GNUNET_SYSERR;
538   }
539   cadet = GNUNET_CADET_connect(cfg,
540       NULL,
541       NULL,
542       &cadet_channel_destroyed,
543       cadet_handlers,
544       NULL);
545   if (NULL == cadet)
546   {
547     LOG (GNUNET_ERROR_TYPE_ERROR,
548         _("Failed to connect to CADET service.\n"));
549     SENSOR_reporting_stop ();
550     return GNUNET_SYSERR;
551   }
552   GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
553   GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
554
555   return GNUNET_OK;
556 }
557
558 /* end of gnunet-service-sensor-reporting.c */