minor fix
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting_value.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting_value.c
23  * @brief sensor service value reporting functionality
24  * @author Omar Tarabai
25  */
26 #include <inttypes.h>
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "sensor.h"
30 #include "gnunet_peerstore_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-value",__VA_ARGS__)
35
36 /**
37  * Retry interval (seconds) in case channel to collection point is busy
38  */
39 #define COLLECTION_RETRY 1
40
41 /**
42  * Context of reporting sensor values
43  */
44 struct ValueReportingContext
45 {
46
47   /**
48    * DLL
49    */
50   struct ValueReportingContext *prev;
51
52   /**
53    * DLL
54    */
55   struct ValueReportingContext *next;
56
57   /**
58    * Sensor information
59    */
60   struct GNUNET_SENSOR_SensorInfo *sensor;
61
62   /**
63    * Collection point reporting task
64    * (or #GNUNET_SCHEDULER_NO_TASK)
65    */
66   GNUNET_SCHEDULER_TaskIdentifier cp_task;
67
68   /**
69    * Watcher of sensor values
70    */
71   struct GNUNET_PEERSTORE_WatchContext *wc;
72
73   /**
74    * Last value read from sensor
75    */
76   void *last_value;
77
78   /**
79    * Size of @e last_value
80    */
81   size_t last_value_size;
82
83   /**
84    * Timestamp of last value reading
85    */
86   uint64_t timestamp;
87
88 };
89
90 /**
91  * Context of a created CADET channel
92  */
93 struct CadetChannelContext
94 {
95
96   /**
97    * DLL
98    */
99   struct CadetChannelContext *prev;
100
101   /**
102    * DLL
103    */
104   struct CadetChannelContext *next;
105
106   /**
107    * Peer Id of
108    */
109   struct GNUNET_PeerIdentity pid;
110
111   /**
112    * CADET channel handle
113    */
114   struct GNUNET_CADET_Channel *c;
115
116   /**
117    * Are we sending data on this channel?
118    * #GNUNET_YES / #GNUNET_NO
119    */
120   int sending;
121
122   /**
123    * Pointer to a pending message to be sent over the channel
124    */
125   void *pending_msg;
126
127   /**
128    * Size of @e pending_msg
129    */
130   size_t pending_msg_size;
131
132   /**
133    * Handle to CADET tranmission request in case we are sending
134    * (sending == #GNUNET_YES)
135    */
136   struct GNUNET_CADET_TransmitHandle *th;
137
138   /**
139    * Are we currently destroying the channel and its context?
140    */
141   int destroying;
142
143 };
144
145 /**
146  * Our configuration.
147  */
148 static const struct GNUNET_CONFIGURATION_Handle *cfg;
149
150 /**
151  * Handle to peerstore service
152  */
153 static struct GNUNET_PEERSTORE_Handle *peerstore;
154
155 /**
156  * My peer id
157  */
158 static struct GNUNET_PeerIdentity mypeerid;
159
160 /**
161  * Handle to CADET service
162  */
163 static struct GNUNET_CADET_Handle *cadet;
164
165 /**
166  * Head of DLL of all reporting contexts
167  */
168 struct ValueReportingContext *vrc_head;
169
170 /**
171  * Tail of DLL of all reporting contexts
172  */
173 struct ValueReportingContext *vrc_tail;
174
175 /**
176  * Head of DLL of all cadet channels
177  */
178 struct CadetChannelContext *cc_head;
179
180 /**
181  * Tail of DLL of all cadet channels
182  */
183 struct CadetChannelContext *cc_tail;
184
185 /**
186  * Destroy a reporting context structure
187  */
188 static void
189 destroy_value_reporting_context (struct ValueReportingContext *vrc)
190 {
191   if (NULL != vrc->wc)
192   {
193     GNUNET_PEERSTORE_watch_cancel (vrc->wc);
194     vrc->wc = NULL;
195   }
196   if (GNUNET_SCHEDULER_NO_TASK != vrc->cp_task)
197   {
198     GNUNET_SCHEDULER_cancel (vrc->cp_task);
199     vrc->cp_task = GNUNET_SCHEDULER_NO_TASK;
200   }
201   if (NULL != vrc->last_value)
202   {
203     GNUNET_free (vrc->last_value);
204     vrc->last_value_size = 0;
205   }
206   GNUNET_free (vrc);
207 }
208
209
210 /**
211  * Destroy a CADET channel context struct
212  */
213 static void
214 destroy_cadet_channel_context (struct CadetChannelContext *cc)
215 {
216   cc->destroying = GNUNET_YES;
217   if (NULL != cc->th)
218   {
219     GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
220     cc->th = NULL;
221   }
222   if (NULL != cc->pending_msg)
223   {
224     GNUNET_free (cc->pending_msg);
225     cc->pending_msg = NULL;
226   }
227   if (NULL != cc->c)
228   {
229     GNUNET_CADET_channel_destroy (cc->c);
230     cc->c = NULL;
231   }
232   GNUNET_free (cc);
233 }
234
235
236 /**
237  * Stop sensor value reporting module
238  */
239 void
240 SENSOR_reporting_value_stop ()
241 {
242   struct ValueReportingContext *vrc;
243   struct CadetChannelContext *cc;
244
245   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor value reporting module.\n");
246   while (NULL != cc_head)
247   {
248     cc = cc_head;
249     GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
250     destroy_cadet_channel_context (cc);
251   }
252   while (NULL != vrc_head)
253   {
254     vrc = vrc_head;
255     GNUNET_CONTAINER_DLL_remove (vrc_head, vrc_tail, vrc);
256     destroy_value_reporting_context (vrc);
257   }
258   if (NULL != peerstore)
259   {
260     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
261     peerstore = NULL;
262   }
263   if (NULL != cadet)
264   {
265     GNUNET_CADET_disconnect (cadet);
266     cadet = NULL;
267   }
268 }
269
270
271 /**
272  * Returns CADET channel established to given peer or creates a new one.
273  *
274  * @param pid Peer Identity
275  * @return Context of established cadet channel
276  */
277 static struct CadetChannelContext *
278 get_cadet_channel (struct GNUNET_PeerIdentity pid)
279 {
280   struct CadetChannelContext *cc;
281
282   cc = cc_head;
283   while (NULL != cc)
284   {
285     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
286       return cc;
287     cc = cc->next;
288   }
289   cc = GNUNET_new (struct CadetChannelContext);
290   cc->c =
291       GNUNET_CADET_channel_create (cadet, cc, &pid,
292                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
293                                    GNUNET_CADET_OPTION_DEFAULT);
294   cc->pid = pid;
295   cc->sending = GNUNET_NO;
296   cc->destroying = GNUNET_NO;
297   GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
298   return cc;
299 }
300
301
302 /**
303  * Construct a reading message ready to be sent over CADET channel
304  *
305  * @param rc reporting context to read data from
306  * @param msg used to return the created message structure
307  * @return size of created message
308  */
309 static size_t
310 construct_reading_message (struct ValueReportingContext *vrc,
311                            struct GNUNET_SENSOR_ReadingMessage **msg)
312 {
313   struct GNUNET_SENSOR_ReadingMessage *ret;
314   uint16_t sensorname_size;
315   uint16_t total_size;
316   void *dummy;
317
318   sensorname_size = strlen (vrc->sensor->name) + 1;
319   total_size =
320       sizeof (struct GNUNET_SENSOR_ReadingMessage) + sensorname_size +
321       vrc->last_value_size;
322   ret = GNUNET_malloc (total_size);
323   ret->header.size = htons (total_size);
324   ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
325   ret->sensorname_size = htons (sensorname_size);
326   ret->sensorversion_major = htons (vrc->sensor->version_major);
327   ret->sensorversion_minor = htons (vrc->sensor->version_minor);
328   ret->timestamp = GNUNET_htobe64 (vrc->timestamp);
329   ret->value_size = htons (vrc->last_value_size);
330   dummy = &ret[1];
331   memcpy (dummy, vrc->sensor->name, sensorname_size);
332   dummy += sensorname_size;
333   memcpy (dummy, vrc->last_value, vrc->last_value_size);
334   *msg = ret;
335   return total_size;
336 }
337
338
339 /**
340  * Function called to notify a client about the connection begin ready
341  * to queue more data.  @a buf will be NULL and @a size zero if the
342  * connection was closed for writing in the meantime.
343  *
344  * @param cls closure
345  * @param size number of bytes available in @a buf
346  * @param buf where the callee should write the message
347  * @return number of bytes written to @a buf
348  */
349 static size_t
350 do_report_value (void *cls, size_t size, void *buf)
351 {
352   struct CadetChannelContext *cc = cls;
353   size_t written = 0;
354
355   cc->th = NULL;
356   cc->sending = GNUNET_NO;
357   LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
358   if (NULL == buf)
359   {
360     LOG (GNUNET_ERROR_TYPE_WARNING,
361          "CADET failed to transmit message (NULL buf), discarding.\n");
362   }
363   else if (size < cc->pending_msg_size)
364   {
365     LOG (GNUNET_ERROR_TYPE_WARNING,
366          "CADET failed to transmit message (small size, expected: %u, got: %u)"
367          ", discarding.\n", cc->pending_msg_size, size);
368   }
369   else
370   {
371     memcpy (buf, cc->pending_msg, cc->pending_msg_size);
372     written = cc->pending_msg_size;
373   }
374   GNUNET_free (cc->pending_msg);
375   cc->pending_msg = NULL;
376   cc->pending_msg_size = 0;
377   return written;
378 }
379
380
381 /**
382  * Task scheduled to send values to collection point
383  *
384  * @param cls closure, a `struct ValueReportingContext *`
385  * @param tc unused
386  */
387 static void
388 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   struct ValueReportingContext *vrc = cls;
391   struct GNUNET_SENSOR_SensorInfo *sensor = vrc->sensor;
392   struct CadetChannelContext *cc;
393   struct GNUNET_SENSOR_ReadingMessage *msg;
394   size_t msg_size;
395
396   vrc->cp_task = GNUNET_SCHEDULER_NO_TASK;
397   if (0 == vrc->last_value_size)        /* Did not receive a sensor value yet */
398   {
399     LOG (GNUNET_ERROR_TYPE_WARNING,
400          "Did not receive a value from `%s' to report yet.\n",
401          vrc->sensor->name);
402     vrc->cp_task =
403         GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
404                                       &report_value, vrc);
405     return;
406   }
407   LOG (GNUNET_ERROR_TYPE_DEBUG,
408        "Now trying to report last seen value of `%s' " "to collection point.\n",
409        vrc->sensor->name);
410   GNUNET_assert (NULL != sensor->collection_point);
411   cc = get_cadet_channel (*sensor->collection_point);
412   if (GNUNET_YES == cc->sending)
413   {
414     LOG (GNUNET_ERROR_TYPE_DEBUG,
415          "Cadet channel to collection point busy, "
416          "trying again for sensor `%s' after %d seconds.\n", vrc->sensor->name,
417          COLLECTION_RETRY);
418     vrc->cp_task =
419         GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
420                                       (GNUNET_TIME_UNIT_SECONDS,
421                                        COLLECTION_RETRY), &report_value, vrc);
422     return;
423   }
424   msg_size = construct_reading_message (vrc, &msg);
425   cc->sending = GNUNET_YES;
426   cc->pending_msg = msg;
427   cc->pending_msg_size = msg_size;
428   cc->th =
429       GNUNET_CADET_notify_transmit_ready (cc->c, GNUNET_YES,
430                                           sensor->value_reporting_interval,
431                                           msg_size, &do_report_value, cc);
432   vrc->cp_task =
433       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
434                                     &report_value, vrc);
435 }
436
437
438 /**
439  * Sensor value watch callback
440  */
441 static int
442 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
443 {
444   struct ValueReportingContext *vrc = cls;
445
446   if (NULL != emsg)
447     return GNUNET_YES;
448   if (NULL != vrc->last_value)
449   {
450     GNUNET_free (vrc->last_value);
451     vrc->last_value_size = 0;
452   }
453   vrc->last_value = GNUNET_malloc (record->value_size);
454   memcpy (vrc->last_value, record->value, record->value_size);
455   vrc->last_value_size = record->value_size;
456   vrc->timestamp = GNUNET_TIME_absolute_get ().abs_value_us;
457   LOG (GNUNET_ERROR_TYPE_DEBUG,
458        "Received a sensor `%s' watch value at " "timestamp %" PRIu64
459        ", updating notification last_value.\n", vrc->sensor->name,
460        vrc->timestamp);
461   return GNUNET_YES;
462 }
463
464
465 /**
466  * Function called whenever a channel is destroyed.  Should clean up
467  * any associated state.
468  *
469  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
470  *
471  * @param cls closure (set from #GNUNET_CADET_connect)
472  * @param channel connection to the other end (henceforth invalid)
473  * @param channel_ctx place where local state associated
474  *                   with the channel is stored
475  */
476 static void
477 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
478                          void *channel_ctx)
479 {
480   struct CadetChannelContext *cc = channel_ctx;
481
482   if (GNUNET_YES == cc->destroying)
483     return;
484   LOG (GNUNET_ERROR_TYPE_DEBUG,
485        "Received a `channel destroyed' notification from CADET, "
486        "cleaning up.\n");
487   GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
488   cc->c = NULL;
489   destroy_cadet_channel_context (cc);
490 }
491
492
493 /**
494  * Iterator for defined sensors
495  * Watches sensors for readings to report
496  *
497  * @param cls unused
498  * @param key unused
499  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
500  * @return #GNUNET_YES to continue iterations
501  */
502 static int
503 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
504                        void *value)
505 {
506   struct GNUNET_SENSOR_SensorInfo *sensor = value;
507   struct ValueReportingContext *vrc;
508
509   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
510     return GNUNET_YES;
511   LOG (GNUNET_ERROR_TYPE_INFO,
512        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
513        sensor->name, GNUNET_i2s_full (sensor->collection_point),
514        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
515                                                GNUNET_YES));
516   vrc = GNUNET_new (struct ValueReportingContext);
517   vrc->sensor = sensor;
518   vrc->last_value = NULL;
519   vrc->last_value_size = 0;
520   vrc->wc =
521       GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
522                               &value_watch_cb, vrc);
523   vrc->cp_task =
524       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
525                                     &report_value, vrc);
526   GNUNET_CONTAINER_DLL_insert (vrc_head, vrc_tail, vrc);
527   return GNUNET_YES;
528 }
529
530
531 /**
532  * Start the sensor value reporting module
533  *
534  * @param c our service configuration
535  * @param sensors multihashmap of loaded sensors
536  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
537  */
538 int
539 SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c,
540                               struct GNUNET_CONTAINER_MultiHashMap *sensors)
541 {
542   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
543     {NULL, 0, 0}
544   };
545
546   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor value reporting module.\n");
547   GNUNET_assert (NULL != sensors);
548   cfg = c;
549   peerstore = GNUNET_PEERSTORE_connect (cfg);
550   if (NULL == peerstore)
551   {
552     LOG (GNUNET_ERROR_TYPE_ERROR,
553          _("Failed to connect to peerstore service.\n"));
554     SENSOR_reporting_value_stop ();
555     return GNUNET_SYSERR;
556   }
557   cadet =
558       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
559                             cadet_handlers, NULL);
560   if (NULL == cadet)
561   {
562     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
563     SENSOR_reporting_value_stop ();
564     return GNUNET_SYSERR;
565   }
566   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
567   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
568   return GNUNET_OK;
569 }
570
571 /* end of gnunet-service-sensor_reporting_value.c */