-fixing doxygen, indentation
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor-reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor-reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include <inttypes.h>
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "sensor.h"
30 #include "gnunet_peerstore_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36 /**
37  * Retry interval (seconds) in case channel to collection point is busy
38  */
39 #define COLLECTION_RETRY 1
40
41 /**
42  * Context of reporting operations
43  */
44 struct ReportingContext
45 {
46
47   /**
48    * DLL
49    */
50   struct ReportingContext *prev;
51
52   /**
53    * DLL
54    */
55   struct ReportingContext *next;
56
57   /**
58    * Sensor information
59    */
60   struct SensorInfo *sensor;
61
62   /**
63    * Collection point reporting task
64    * (or #GNUNET_SCHEDULER_NO_TASK)
65    */
66   GNUNET_SCHEDULER_TaskIdentifier cp_task;
67
68   /**
69    * Watcher of sensor values
70    */
71   struct GNUNET_PEERSTORE_WatchContext *wc;
72
73   /**
74    * Last value read from sensor
75    */
76   void *last_value;
77
78   /**
79    * Size of @e last_value
80    */
81   size_t last_value_size;
82
83   /**
84    * Timestamp of last value reading
85    */
86   uint64_t timestamp;
87
88 };
89
90 /**
91  * Context of a created CADET channel
92  */
93 struct CadetChannelContext
94 {
95
96   /**
97    * DLL
98    */
99   struct CadetChannelContext *prev;
100
101   /**
102    * DLL
103    */
104   struct CadetChannelContext *next;
105
106   /**
107    * Peer Id of
108    */
109   struct GNUNET_PeerIdentity pid;
110
111   /**
112    * CADET channel handle
113    */
114   struct GNUNET_CADET_Channel *c;
115
116   /**
117    * Are we sending data on this channel?
118    * #GNUNET_YES / #GNUNET_NO
119    */
120   int sending;
121
122   /**
123    * Pointer to a pending message to be sent over the channel
124    */
125   void *pending_msg;
126
127   /**
128    * Size of @e pending_msg
129    */
130   size_t pending_msg_size;
131
132   /**
133    * Handle to CADET tranmission request in case we are sending
134    * (sending == #GNUNET_YES)
135    */
136   struct GNUNET_CADET_TransmitHandle *th;
137
138   /**
139    * Are we currently destroying the channel and its context?
140    */
141   int destroying;
142
143 };
144
145 /**
146  * Our configuration.
147  */
148 static const struct GNUNET_CONFIGURATION_Handle *cfg;
149
150 /**
151  * Handle to peerstore service
152  */
153 static struct GNUNET_PEERSTORE_Handle *peerstore;
154
155 /**
156  * My peer id
157  */
158 static struct GNUNET_PeerIdentity mypeerid;
159
160 /**
161  * Handle to CADET service
162  */
163 static struct GNUNET_CADET_Handle *cadet;
164
165 /**
166  * Head of DLL of all reporting contexts
167  */
168 struct ReportingContext *rc_head;
169
170 /**
171  * Tail of DLL of all reporting contexts
172  */
173 struct ReportingContext *rc_tail;
174
175 /**
176  * Head of DLL of all cadet channels
177  */
178 struct CadetChannelContext *cc_head;
179
180 /**
181  * Tail of DLL of all cadet channels
182  */
183 struct CadetChannelContext *cc_tail;
184
185
186 /**
187  * Destroy a reporting context structure
188  */
189 static void
190 destroy_reporting_context (struct ReportingContext *rc)
191 {
192   if (NULL != rc->wc)
193   {
194     GNUNET_PEERSTORE_watch_cancel (rc->wc);
195     rc->wc = NULL;
196   }
197   if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
198   {
199     GNUNET_SCHEDULER_cancel(rc->cp_task);
200     rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
201   }
202   if (NULL != rc->last_value)
203   {
204     GNUNET_free (rc->last_value);
205     rc->last_value_size = 0;
206   }
207   GNUNET_free(rc);
208 }
209
210 /**
211  * Destroy a CADET channel context struct
212  */
213 static void
214 destroy_cadet_channel_context (struct CadetChannelContext *cc)
215 {
216   cc->destroying = GNUNET_YES;
217   if (NULL != cc->th)
218   {
219     GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
220     cc->th = NULL;
221   }
222   if (NULL != cc->pending_msg)
223   {
224     GNUNET_free (cc->pending_msg);
225     cc->pending_msg = NULL;
226   }
227   if (NULL != cc->c)
228   {
229     GNUNET_CADET_channel_destroy (cc->c);
230     cc->c = NULL;
231   }
232   GNUNET_free (cc);
233 }
234
235
236 /**
237  * Stop sensor reporting module
238  */
239 void
240 SENSOR_reporting_stop ()
241 {
242   struct ReportingContext *rc;
243   struct CadetChannelContext *cc;
244
245   LOG (GNUNET_ERROR_TYPE_DEBUG,
246        "Stopping sensor reporting module.\n");
247   while (NULL != cc_head)
248   {
249     cc = cc_head;
250     GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
251     destroy_cadet_channel_context (cc);
252   }
253   while (NULL != rc_head)
254   {
255     rc = rc_head;
256     GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
257     destroy_reporting_context (rc);
258   }
259   if (NULL != peerstore)
260   {
261     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
262     peerstore = NULL;
263   }
264   if (NULL != cadet)
265   {
266     GNUNET_CADET_disconnect (cadet);
267     cadet = NULL;
268   }
269 }
270
271
272 /**
273  * Returns CADET channel established to given peer
274  * or creates a new one
275  *
276  * @param pid Peer Identity
277  * @return Context of established cadet channel
278  */
279 static struct CadetChannelContext *
280 get_cadet_channel (struct GNUNET_PeerIdentity pid)
281 {
282   struct CadetChannelContext *cc;
283
284   cc = cc_head;
285   while (NULL != cc)
286   {
287     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
288       return cc;
289     cc = cc->next;
290   }
291   cc = GNUNET_new (struct CadetChannelContext);
292   cc->c = GNUNET_CADET_channel_create(cadet,
293       cc,
294       &pid,
295       GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
296       GNUNET_CADET_OPTION_DEFAULT);
297   cc->pid = pid;
298   cc->sending = GNUNET_NO;
299   cc->destroying = GNUNET_NO;
300   GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
301   return cc;
302 }
303
304
305 /**
306  * Construct a reading message ready to be sent over CADET channel
307  *
308  * @param rc reporting context to read data from
309  * @param msg used to return the created message structure
310  * @return size of created message
311  */
312 static size_t
313 construct_reading_message (struct ReportingContext *rc,
314                            struct GNUNET_SENSOR_ReadingMessage **msg)
315 {
316   struct GNUNET_SENSOR_ReadingMessage *ret;
317   uint16_t sensorname_size;
318   uint16_t total_size;
319   void *dummy;
320
321   sensorname_size = strlen (rc->sensor->name) + 1;
322   total_size = sizeof(struct GNUNET_SENSOR_ReadingMessage) +
323       sensorname_size +
324       rc->last_value_size;
325   ret = GNUNET_malloc (total_size);
326   ret->header.size = htons (total_size);
327   ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
328   ret->sensorname_size = htons (sensorname_size);
329   ret->sensorversion_major = htons (rc->sensor->version_major);
330   ret->sensorversion_minor = htons (rc->sensor->version_minor);
331   ret->timestamp = GNUNET_htobe64 (rc->timestamp);
332   ret->value_size = htons (rc->last_value_size);
333   dummy = &ret[1];
334   memcpy (dummy, rc->sensor->name, sensorname_size);
335   dummy += sensorname_size;
336   memcpy (dummy, rc->last_value, rc->last_value_size);
337   *msg = ret;
338   return total_size;
339 }
340
341
342 /**
343  * Function called to notify a client about the connection begin ready
344  * to queue more data.  @a buf will be NULL and @a size zero if the
345  * connection was closed for writing in the meantime.
346  *
347  * @param cls closure
348  * @param size number of bytes available in @a buf
349  * @param buf where the callee should write the message
350  * @return number of bytes written to @a buf
351  */
352 static size_t
353 do_report_collection_point (void *cls, size_t size, void *buf)
354 {
355   struct CadetChannelContext *cc = cls;
356   size_t written = 0;
357
358   cc->th = NULL;
359   cc->sending = GNUNET_NO;
360   LOG (GNUNET_ERROR_TYPE_DEBUG,
361        "Copying to CADET transmit buffer.\n");
362   if (NULL == buf)
363   {
364     LOG (GNUNET_ERROR_TYPE_WARNING,
365         "CADET failed to transmit message (NULL buf), discarding.\n");
366   }
367   else if (size < cc->pending_msg_size)
368   {
369     LOG (GNUNET_ERROR_TYPE_WARNING,
370         "CADET failed to transmit message (small size, expected: %u, got: %u)"
371         ", discarding.\n", cc->pending_msg_size, size);
372   }
373   else
374   {
375     memcpy (buf, cc->pending_msg, cc->pending_msg_size);
376     written = cc->pending_msg_size;
377   }
378   GNUNET_free (cc->pending_msg);
379   cc->pending_msg = NULL;
380   cc->pending_msg_size = 0;
381   return written;
382 }
383
384
385 /**
386  * Task scheduled to send values to collection point
387  *
388  * @param cls closure, a `struct CollectionReportingContext *`
389  * @param tc unused
390  */
391 static void
392 report_collection_point (void *cls,
393                          const struct GNUNET_SCHEDULER_TaskContext* tc)
394 {
395   struct ReportingContext *rc = cls;
396   struct SensorInfo *sensor = rc->sensor;
397   struct CadetChannelContext *cc;
398   struct GNUNET_SENSOR_ReadingMessage *msg;
399   size_t msg_size;
400
401   rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
402   if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
403   {
404     LOG (GNUNET_ERROR_TYPE_WARNING,
405          "Did not receive a value from `%s' to report yet.\n",
406          rc->sensor->name);
407     rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
408             &report_collection_point, rc);
409     return;
410   }
411   LOG (GNUNET_ERROR_TYPE_DEBUG,
412        "Now trying to report last seen value of `%s' "
413        "to collection point.\n",
414        rc->sensor->name);
415   GNUNET_assert (NULL != sensor->collection_point);
416   cc = get_cadet_channel (*sensor->collection_point);
417   if (GNUNET_YES == cc->sending)
418   {
419     LOG (GNUNET_ERROR_TYPE_DEBUG,
420          "Cadet channel to collection point busy, "
421          "trying again for sensor `%s' after %d seconds.\n",
422          rc->sensor->name,
423          COLLECTION_RETRY);
424     rc->cp_task = GNUNET_SCHEDULER_add_delayed (
425       GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, COLLECTION_RETRY),
426       &report_collection_point, rc);
427     return;
428   }
429   msg_size = construct_reading_message (rc, &msg);
430   cc->sending = GNUNET_YES;
431   cc->pending_msg = msg;
432   cc->pending_msg_size = msg_size;
433   cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
434       GNUNET_YES,
435       sensor->collection_interval,
436       msg_size,
437       &do_report_collection_point,
438       cc);
439   rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
440       &report_collection_point, rc);
441 }
442
443
444 /**
445  * Sensor value watch callback
446  */
447 static int
448 sensor_watch_cb (void *cls,
449     struct GNUNET_PEERSTORE_Record *record,
450     char *emsg)
451 {
452   struct ReportingContext *rc = cls;
453
454   if (NULL != emsg)
455     return GNUNET_YES;
456   if (NULL != rc->last_value)
457   {
458     GNUNET_free (rc->last_value);
459     rc->last_value_size = 0;
460   }
461   rc->last_value = GNUNET_malloc(record->value_size);
462   memcpy (rc->last_value, record->value, record->value_size);
463   rc->last_value_size = record->value_size;
464   rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
465   LOG (GNUNET_ERROR_TYPE_DEBUG,
466        "Received a sensor `%s' watch value at "
467        "timestamp %" PRIu64 ", updating notification last_value.\n",
468        rc->sensor->name,
469        rc->timestamp);
470   return GNUNET_YES;
471 }
472
473
474 /**
475  * Iterator for defined sensors
476  * Watches sensors for readings to report
477  *
478  * @param cls unused
479  * @param key unused
480  * @param value a 'struct SensorInfo *' with sensor information
481  * @return #GNUNET_YES to continue iterations
482  */
483 static int
484 init_sensor_reporting (void *cls,
485     const struct GNUNET_HashCode *key,
486     void *value)
487 {
488   struct SensorInfo *sensor = value;
489   struct ReportingContext *rc;
490
491   if (NULL == sensor->collection_point &&
492       GNUNET_NO == sensor->p2p_report)
493     return GNUNET_YES;
494   rc = GNUNET_new (struct ReportingContext);
495   rc->sensor = sensor;
496   rc->last_value = NULL;
497   rc->last_value_size = 0;
498   rc->wc = GNUNET_PEERSTORE_watch(peerstore,
499       "sensor",
500       &mypeerid,
501       sensor->name,
502       &sensor_watch_cb,
503       rc);
504   if (NULL != sensor->collection_point)
505   {
506     LOG (GNUNET_ERROR_TYPE_INFO,
507         "Will start reporting sensor `%s' values to "
508         "collection point `%s' every %s.\n",
509         sensor->name, GNUNET_i2s_full(sensor->collection_point),
510         GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
511             GNUNET_YES));
512     rc->cp_task =
513         GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
514             &report_collection_point,
515             rc);
516   }
517   if (GNUNET_YES == sensor->p2p_report)
518   {
519     LOG (GNUNET_ERROR_TYPE_INFO,
520         "Will start reporting sensor `%s' values to p2p network every %s.\n",
521         sensor->name,
522         GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
523             GNUNET_YES));
524   }
525   GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
526   return GNUNET_YES;
527 }
528
529
530 /**
531  * Function called whenever a channel is destroyed.  Should clean up
532  * any associated state.
533  *
534  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
535  *
536  * @param cls closure (set from #GNUNET_CADET_connect)
537  * @param channel connection to the other end (henceforth invalid)
538  * @param channel_ctx place where local state associated
539  *                   with the channel is stored
540  */
541 static void
542 cadet_channel_destroyed (void *cls,
543                          const struct GNUNET_CADET_Channel *channel,
544                          void *channel_ctx)
545 {
546   struct CadetChannelContext *cc = channel_ctx;
547
548   if (GNUNET_YES == cc->destroying)
549     return;
550   LOG (GNUNET_ERROR_TYPE_DEBUG,
551       "Received a `channel destroyed' notification from CADET, "
552       "cleaning up.\n");
553   GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
554   cc->c = NULL;
555   destroy_cadet_channel_context (cc);
556 }
557
558 /**
559  * Start the sensor reporting module
560  *
561  * @param c our service configuration
562  * @param sensors multihashmap of loaded sensors
563  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
564  */
565 int
566 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
567                         struct GNUNET_CONTAINER_MultiHashMap *sensors)
568 {
569   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
570       {NULL, 0, 0}
571   };
572
573   LOG (GNUNET_ERROR_TYPE_DEBUG,
574        "Starting sensor reporting module.\n");
575   GNUNET_assert(NULL != sensors);
576   cfg = c;
577   peerstore = GNUNET_PEERSTORE_connect(cfg);
578   if (NULL == peerstore)
579   {
580     LOG (GNUNET_ERROR_TYPE_ERROR,
581         _("Failed to connect to peerstore service.\n"));
582     SENSOR_reporting_stop ();
583     return GNUNET_SYSERR;
584   }
585   cadet = GNUNET_CADET_connect(cfg,
586       NULL,
587       NULL,
588       &cadet_channel_destroyed,
589       cadet_handlers,
590       NULL);
591   if (NULL == cadet)
592   {
593     LOG (GNUNET_ERROR_TYPE_ERROR,
594          _("Failed to connect to CADET service.\n"));
595     SENSOR_reporting_stop ();
596     return GNUNET_SYSERR;
597   }
598   GNUNET_CRYPTO_get_peer_identity (cfg,
599                                    &mypeerid);
600   GNUNET_CONTAINER_multihashmap_iterate(sensors,
601                                         &init_sensor_reporting, NULL);
602   return GNUNET_OK;
603 }
604
605 /* end of gnunet-service-sensor-reporting.c */