completed sensordashboard + fixes
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor-reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor-reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include <inttypes.h>
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "sensor.h"
30 #include "gnunet_peerstore_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36 /**
37  * Retry interval (seconds) in case channel to collection point is busy
38  */
39 #define COLLECTION_RETRY 1
40
41 /**
42  * Context of reporting operations
43  */
44 struct ReportingContext
45 {
46
47   /**
48    * DLL
49    */
50   struct ReportingContext *prev;
51
52   /**
53    * DLL
54    */
55   struct ReportingContext *next;
56
57   /**
58    * Sensor information
59    */
60   struct SensorInfo *sensor;
61
62   /**
63    * Collection point reporting task
64    * (OR GNUNET_SCHEDULER_NO_TASK)
65    */
66   GNUNET_SCHEDULER_TaskIdentifier cp_task;
67
68   /**
69    * Watcher of sensor values
70    */
71   struct GNUNET_PEERSTORE_WatchContext *wc;
72
73   /**
74    * Last value read from sensor
75    */
76   void *last_value;
77
78   /**
79    * Size of @last_value
80    */
81   size_t last_value_size;
82
83   /**
84    * Timestamp of last value reading
85    */
86   uint64_t timestamp;
87
88 };
89
90 /**
91  * Context of a created CADET channel
92  */
93 struct CadetChannelContext
94 {
95
96   /**
97    * DLL
98    */
99   struct CadetChannelContext *prev;
100
101   /**
102    * DLL
103    */
104   struct CadetChannelContext *next;
105
106   /**
107    * Peer Id of
108    */
109   struct GNUNET_PeerIdentity pid;
110
111   /**
112    * CADET channel handle
113    */
114   struct GNUNET_CADET_Channel *c;
115
116   /**
117    * Are we sending data on this channel?
118    * #GNUNET_YES / #GNUNET_NO
119    */
120   int sending;
121
122   /**
123    * Pointer to a pending message to be sent over the channel
124    */
125   void *pending_msg;
126
127   /**
128    * Size of @pending_msg
129    */
130   size_t pending_msg_size;
131
132   /**
133    * Handle to CADET tranmission request in case we are sending
134    * (sending == GNUNET_YES)
135    */
136   struct GNUNET_CADET_TransmitHandle *th;
137
138   /**
139    * Are we currently destroying the channel and its context?
140    */
141   int destroying;
142
143 };
144
145 /**
146  * Our configuration.
147  */
148 static const struct GNUNET_CONFIGURATION_Handle *cfg;
149
150 /**
151  * Handle to peerstore service
152  */
153 static struct GNUNET_PEERSTORE_Handle *peerstore;
154
155 /**
156  * My peer id
157  */
158 static struct GNUNET_PeerIdentity mypeerid;
159
160 /**
161  * Handle to CADET service
162  */
163 static struct GNUNET_CADET_Handle *cadet;
164
165 /**
166  * Head of DLL of all reporting contexts
167  */
168 struct ReportingContext *rc_head;
169
170 /**
171  * Tail of DLL of all reporting contexts
172  */
173 struct ReportingContext *rc_tail;
174
175 /**
176  * Head of DLL of all cadet channels
177  */
178 struct CadetChannelContext *cc_head;
179
180 /**
181  * Tail of DLL of all cadet channels
182  */
183 struct CadetChannelContext *cc_tail;
184
185
186 /**
187  * Destroy a reporting context structure
188  */
189 static void
190 destroy_reporting_context (struct ReportingContext *rc)
191 {
192   if (NULL != rc->wc)
193   {
194     GNUNET_PEERSTORE_watch_cancel (rc->wc);
195     rc->wc = NULL;
196   }
197   if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
198   {
199     GNUNET_SCHEDULER_cancel(rc->cp_task);
200     rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
201   }
202   if (NULL != rc->last_value)
203   {
204     GNUNET_free (rc->last_value);
205     rc->last_value_size = 0;
206   }
207   GNUNET_free(rc);
208 }
209
210 /**
211  * Destroy a CADET channel context struct
212  */
213 static void
214 destroy_cadet_channel_context (struct CadetChannelContext *cc)
215 {
216   cc->destroying = GNUNET_YES;
217   if (NULL != cc->th)
218   {
219     GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
220     cc->th = NULL;
221   }
222   if (NULL != cc->pending_msg)
223   {
224     GNUNET_free (cc->pending_msg);
225     cc->pending_msg = NULL;
226   }
227   if (NULL != cc->c)
228   {
229     GNUNET_CADET_channel_destroy (cc->c);
230     cc->c = NULL;
231   }
232   GNUNET_free (cc);
233 }
234
235 /**
236  * Stop sensor reporting module
237  */
238 void SENSOR_reporting_stop ()
239 {
240   struct ReportingContext *rc;
241   struct CadetChannelContext *cc;
242
243   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
244   while (NULL != cc_head)
245   {
246     cc = cc_head;
247     GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
248     destroy_cadet_channel_context (cc);
249   }
250   while (NULL != rc_head)
251   {
252     rc = rc_head;
253     GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
254     destroy_reporting_context (rc);
255   }
256   if (NULL != peerstore)
257   {
258     GNUNET_PEERSTORE_disconnect (peerstore);
259     peerstore = NULL;
260   }
261   if (NULL != cadet)
262   {
263     GNUNET_CADET_disconnect (cadet);
264     cadet = NULL;
265   }
266 }
267
268 /**
269  * Returns CADET channel established to given peer
270  * or creates a new one
271  *
272  * @param pid Peer Identity
273  * @return Context of established cadet channel
274  */
275 static struct CadetChannelContext *
276 get_cadet_channel (struct GNUNET_PeerIdentity pid)
277 {
278   struct CadetChannelContext *cc;
279
280   cc = cc_head;
281   while (NULL != cc)
282   {
283     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
284       return cc;
285     cc = cc->next;
286   }
287   cc = GNUNET_new (struct CadetChannelContext);
288   cc->c = GNUNET_CADET_channel_create(cadet,
289       cc,
290       &pid,
291       GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
292       GNUNET_CADET_OPTION_DEFAULT);
293   cc->pid = pid;
294   cc->sending = GNUNET_NO;
295   cc->destroying = GNUNET_NO;
296   GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
297   return cc;
298 }
299
300 /**
301  * Construct a reading message ready to be sent over CADET channel
302  *
303  * @param rc reporting context to read data from
304  * @param msg used to return the created message structure
305  * @return size of created message
306  */
307 static size_t
308 construct_reading_message (struct ReportingContext *rc,
309     struct GNUNET_SENSOR_ReadingMessage **msg)
310 {
311   struct GNUNET_SENSOR_ReadingMessage *ret;
312   uint16_t sensorname_size;
313   uint16_t total_size;
314   void *dummy;
315
316   sensorname_size = strlen (rc->sensor->name) + 1;
317   total_size = sizeof(struct GNUNET_SENSOR_ReadingMessage) +
318       sensorname_size +
319       rc->last_value_size;
320   ret = GNUNET_malloc (total_size);
321   ret->header.size = htons (total_size);
322   ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
323   ret->sensorname_size = htons (sensorname_size);
324   ret->sensorversion_major = htons (rc->sensor->version_major);
325   ret->sensorversion_minor = htons (rc->sensor->version_minor);
326   ret->timestamp = GNUNET_htobe64 (rc->timestamp);
327   ret->value_size = htons (rc->last_value_size);
328   dummy = &ret[1];
329   memcpy (dummy, rc->sensor->name, sensorname_size);
330   dummy += sensorname_size;
331   memcpy (dummy, rc->last_value, rc->last_value_size);
332   *msg = ret;
333   return total_size;
334 }
335
336 /**
337  * Function called to notify a client about the connection begin ready
338  * to queue more data.  @a buf will be NULL and @a size zero if the
339  * connection was closed for writing in the meantime.
340  *
341  * @param cls closure
342  * @param size number of bytes available in @a buf
343  * @param buf where the callee should write the message
344  * @return number of bytes written to @a buf
345  */
346 static size_t
347 do_report_collection_point (void *cls, size_t size, void *buf)
348 {
349   struct CadetChannelContext *cc = cls;
350   size_t written = 0;
351
352   cc->th = NULL;
353   cc->sending = GNUNET_NO;
354   LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
355   if (NULL == buf)
356   {
357     LOG (GNUNET_ERROR_TYPE_WARNING,
358         "CADET failed to transmit message (NULL buf), discarding.\n");
359   }
360   else if (size < cc->pending_msg_size)
361   {
362     LOG (GNUNET_ERROR_TYPE_WARNING,
363         "CADET failed to transmit message (small size, expected: %u, got: %u)"
364         ", discarding.\n", cc->pending_msg_size, size);
365   }
366   else
367   {
368     memcpy (buf, cc->pending_msg, cc->pending_msg_size);
369     written = cc->pending_msg_size;
370   }
371   GNUNET_free (cc->pending_msg);
372   cc->pending_msg = NULL;
373   cc->pending_msg_size = 0;
374   return written;
375 }
376
377 /**
378  * Task scheduled to send values to collection point
379  *
380  * @param cls closure, a 'struct CollectionReportingContext *'
381  * @param tc unused
382  */
383 static void report_collection_point
384 (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
385 {
386   struct ReportingContext *rc = cls;
387   struct SensorInfo *sensor = rc->sensor;
388   struct CadetChannelContext *cc;
389   struct GNUNET_SENSOR_ReadingMessage *msg;
390   size_t msg_size;
391
392   rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
393   if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
394   {
395     LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' "
396         "to report yet.\n", rc->sensor->name);
397     rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
398             &report_collection_point, rc);
399     return;
400   }
401   LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' "
402       "to collection point.\n", rc->sensor->name);
403   GNUNET_assert (NULL != sensor->collection_point);
404   cc = get_cadet_channel (*sensor->collection_point);
405   if (GNUNET_YES == cc->sending)
406   {
407     LOG (GNUNET_ERROR_TYPE_DEBUG,
408         "Cadet channel to collection point busy, "
409         "trying again for sensor `%s' after %d seconds.\n", rc->sensor->name,
410         COLLECTION_RETRY);
411     rc->cp_task = GNUNET_SCHEDULER_add_delayed (
412       GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, COLLECTION_RETRY),
413       &report_collection_point, rc);
414     return;
415   }
416   msg_size = construct_reading_message (rc, &msg);
417   cc->sending = GNUNET_YES;
418   cc->pending_msg = msg;
419   cc->pending_msg_size = msg_size;
420   cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
421       GNUNET_YES,
422       sensor->collection_interval,
423       msg_size,
424       &do_report_collection_point,
425       cc);
426   rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
427       &report_collection_point, rc);
428 }
429
430 /*
431  * Sensor value watch callback
432  */
433 static int
434 sensor_watch_cb (void *cls,
435     struct GNUNET_PEERSTORE_Record *record,
436     char *emsg)
437 {
438   struct ReportingContext *rc = cls;
439
440   if (NULL != emsg)
441     return GNUNET_YES;
442   if (NULL != rc->last_value)
443   {
444     GNUNET_free (rc->last_value);
445     rc->last_value_size = 0;
446   }
447   rc->last_value = GNUNET_malloc(record->value_size);
448   memcpy (rc->last_value, record->value, record->value_size);
449   rc->last_value_size = record->value_size;
450   rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
451   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value at "
452       "timestamp %" PRIu64 ", updating notification last_value.\n",
453       rc->sensor->name, rc->timestamp);
454   return GNUNET_YES;
455 }
456
457 /**
458  * Iterator for defined sensors
459  * Watches sensors for readings to report
460  *
461  * @param cls unused
462  * @param key unused
463  * @param value a 'struct SensorInfo *' with sensor information
464  * @return #GNUNET_YES to continue iterations
465  */
466 static int
467 init_sensor_reporting (void *cls,
468     const struct GNUNET_HashCode *key,
469     void *value)
470 {
471   struct SensorInfo *sensor = value;
472   struct ReportingContext *rc;
473
474   if (NULL == sensor->collection_point &&
475       GNUNET_NO == sensor->p2p_report)
476     return GNUNET_YES;
477   rc = GNUNET_new (struct ReportingContext);
478   rc->sensor = sensor;
479   rc->last_value = NULL;
480   rc->last_value_size = 0;
481   rc->wc = GNUNET_PEERSTORE_watch(peerstore,
482       "sensor",
483       &mypeerid,
484       sensor->name,
485       &sensor_watch_cb,
486       rc);
487   if (NULL != sensor->collection_point)
488   {
489     LOG (GNUNET_ERROR_TYPE_INFO,
490         "Will start reporting sensor `%s' values to "
491         "collection point `%s' every %s.\n",
492         sensor->name, GNUNET_i2s_full(sensor->collection_point),
493         GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
494             GNUNET_YES));
495     rc->cp_task =
496         GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
497             &report_collection_point,
498             rc);
499   }
500   if (GNUNET_YES == sensor->p2p_report)
501   {
502     LOG (GNUNET_ERROR_TYPE_INFO,
503         "Will start reporting sensor `%s' values to p2p network every %s.\n",
504         sensor->name,
505         GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
506             GNUNET_YES));
507   }
508   GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
509   return GNUNET_YES;
510 }
511
512 /**
513  * Function called whenever a channel is destroyed.  Should clean up
514  * any associated state.
515  *
516  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
517  *
518  * @param cls closure (set from #GNUNET_CADET_connect)
519  * @param channel connection to the other end (henceforth invalid)
520  * @param channel_ctx place where local state associated
521  *                   with the channel is stored
522  */
523 static void cadet_channel_destroyed (void *cls,
524     const struct GNUNET_CADET_Channel *channel,
525     void *channel_ctx)
526 {
527   struct CadetChannelContext *cc = channel_ctx;
528
529   if (GNUNET_YES == cc->destroying)
530     return;
531   LOG (GNUNET_ERROR_TYPE_DEBUG,
532       "Received a `channel destroyed' notification from CADET, "
533       "cleaning up.\n");
534   GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
535   cc->c = NULL;
536   destroy_cadet_channel_context (cc);
537 }
538
539 /**
540  * Start the sensor reporting module
541  *
542  * @param c our service configuration
543  * @param sensors multihashmap of loaded sensors
544  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
545  */
546 int
547 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
548     struct GNUNET_CONTAINER_MultiHashMap *sensors)
549 {
550   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
551       {NULL, 0, 0}
552   };
553
554   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
555   GNUNET_assert(NULL != sensors);
556   cfg = c;
557   peerstore = GNUNET_PEERSTORE_connect(cfg);
558   if (NULL == peerstore)
559   {
560     LOG (GNUNET_ERROR_TYPE_ERROR,
561         _("Failed to connect to peerstore service.\n"));
562     SENSOR_reporting_stop ();
563     return GNUNET_SYSERR;
564   }
565   cadet = GNUNET_CADET_connect(cfg,
566       NULL,
567       NULL,
568       &cadet_channel_destroyed,
569       cadet_handlers,
570       NULL);
571   if (NULL == cadet)
572   {
573     LOG (GNUNET_ERROR_TYPE_ERROR,
574         _("Failed to connect to CADET service.\n"));
575     SENSOR_reporting_stop ();
576     return GNUNET_SYSERR;
577   }
578   GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
579   GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
580
581   return GNUNET_OK;
582 }
583
584 /* end of gnunet-service-sensor-reporting.c */