2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor-reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
28 #include "gnunet_util_lib.h"
30 #include "gnunet_peerstore_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
37 * Retry interval (seconds) in case channel to collection point is busy
39 #define COLLECTION_RETRY 1
42 * Context of reporting operations
44 struct ReportingContext
50 struct ReportingContext *prev;
55 struct ReportingContext *next;
60 struct SensorInfo *sensor;
63 * Collection point reporting task
64 * (or #GNUNET_SCHEDULER_NO_TASK)
66 GNUNET_SCHEDULER_TaskIdentifier cp_task;
69 * Watcher of sensor values
71 struct GNUNET_PEERSTORE_WatchContext *wc;
74 * Last value read from sensor
79 * Size of @e last_value
81 size_t last_value_size;
84 * Timestamp of last value reading
91 * Context of a created CADET channel
93 struct CadetChannelContext
99 struct CadetChannelContext *prev;
104 struct CadetChannelContext *next;
109 struct GNUNET_PeerIdentity pid;
112 * CADET channel handle
114 struct GNUNET_CADET_Channel *c;
117 * Are we sending data on this channel?
118 * #GNUNET_YES / #GNUNET_NO
123 * Pointer to a pending message to be sent over the channel
128 * Size of @e pending_msg
130 size_t pending_msg_size;
133 * Handle to CADET tranmission request in case we are sending
134 * (sending == #GNUNET_YES)
136 struct GNUNET_CADET_TransmitHandle *th;
139 * Are we currently destroying the channel and its context?
148 static const struct GNUNET_CONFIGURATION_Handle *cfg;
151 * Handle to peerstore service
153 static struct GNUNET_PEERSTORE_Handle *peerstore;
158 static struct GNUNET_PeerIdentity mypeerid;
161 * Handle to CADET service
163 static struct GNUNET_CADET_Handle *cadet;
166 * Head of DLL of all reporting contexts
168 struct ReportingContext *rc_head;
171 * Tail of DLL of all reporting contexts
173 struct ReportingContext *rc_tail;
176 * Head of DLL of all cadet channels
178 struct CadetChannelContext *cc_head;
181 * Tail of DLL of all cadet channels
183 struct CadetChannelContext *cc_tail;
187 * Destroy a reporting context structure
190 destroy_reporting_context (struct ReportingContext *rc)
194 GNUNET_PEERSTORE_watch_cancel (rc->wc);
197 if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
199 GNUNET_SCHEDULER_cancel(rc->cp_task);
200 rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
202 if (NULL != rc->last_value)
204 GNUNET_free (rc->last_value);
205 rc->last_value_size = 0;
211 * Destroy a CADET channel context struct
214 destroy_cadet_channel_context (struct CadetChannelContext *cc)
216 cc->destroying = GNUNET_YES;
219 GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
222 if (NULL != cc->pending_msg)
224 GNUNET_free (cc->pending_msg);
225 cc->pending_msg = NULL;
229 GNUNET_CADET_channel_destroy (cc->c);
237 * Stop sensor reporting module
240 SENSOR_reporting_stop ()
242 struct ReportingContext *rc;
243 struct CadetChannelContext *cc;
245 LOG (GNUNET_ERROR_TYPE_DEBUG,
246 "Stopping sensor reporting module.\n");
247 while (NULL != cc_head)
250 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
251 destroy_cadet_channel_context (cc);
253 while (NULL != rc_head)
256 GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
257 destroy_reporting_context (rc);
259 if (NULL != peerstore)
261 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
266 GNUNET_CADET_disconnect (cadet);
273 * Returns CADET channel established to given peer
274 * or creates a new one
276 * @param pid Peer Identity
277 * @return Context of established cadet channel
279 static struct CadetChannelContext *
280 get_cadet_channel (struct GNUNET_PeerIdentity pid)
282 struct CadetChannelContext *cc;
287 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
291 cc = GNUNET_new (struct CadetChannelContext);
292 cc->c = GNUNET_CADET_channel_create(cadet,
295 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
296 GNUNET_CADET_OPTION_DEFAULT);
298 cc->sending = GNUNET_NO;
299 cc->destroying = GNUNET_NO;
300 GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
306 * Construct a reading message ready to be sent over CADET channel
308 * @param rc reporting context to read data from
309 * @param msg used to return the created message structure
310 * @return size of created message
313 construct_reading_message (struct ReportingContext *rc,
314 struct GNUNET_SENSOR_ReadingMessage **msg)
316 struct GNUNET_SENSOR_ReadingMessage *ret;
317 uint16_t sensorname_size;
321 sensorname_size = strlen (rc->sensor->name) + 1;
322 total_size = sizeof(struct GNUNET_SENSOR_ReadingMessage) +
325 ret = GNUNET_malloc (total_size);
326 ret->header.size = htons (total_size);
327 ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
328 ret->sensorname_size = htons (sensorname_size);
329 ret->sensorversion_major = htons (rc->sensor->version_major);
330 ret->sensorversion_minor = htons (rc->sensor->version_minor);
331 ret->timestamp = GNUNET_htobe64 (rc->timestamp);
332 ret->value_size = htons (rc->last_value_size);
334 memcpy (dummy, rc->sensor->name, sensorname_size);
335 dummy += sensorname_size;
336 memcpy (dummy, rc->last_value, rc->last_value_size);
343 * Function called to notify a client about the connection begin ready
344 * to queue more data. @a buf will be NULL and @a size zero if the
345 * connection was closed for writing in the meantime.
348 * @param size number of bytes available in @a buf
349 * @param buf where the callee should write the message
350 * @return number of bytes written to @a buf
353 do_report_collection_point (void *cls, size_t size, void *buf)
355 struct CadetChannelContext *cc = cls;
359 cc->sending = GNUNET_NO;
360 LOG (GNUNET_ERROR_TYPE_DEBUG,
361 "Copying to CADET transmit buffer.\n");
364 LOG (GNUNET_ERROR_TYPE_WARNING,
365 "CADET failed to transmit message (NULL buf), discarding.\n");
367 else if (size < cc->pending_msg_size)
369 LOG (GNUNET_ERROR_TYPE_WARNING,
370 "CADET failed to transmit message (small size, expected: %u, got: %u)"
371 ", discarding.\n", cc->pending_msg_size, size);
375 memcpy (buf, cc->pending_msg, cc->pending_msg_size);
376 written = cc->pending_msg_size;
378 GNUNET_free (cc->pending_msg);
379 cc->pending_msg = NULL;
380 cc->pending_msg_size = 0;
386 * Task scheduled to send values to collection point
388 * @param cls closure, a `struct CollectionReportingContext *`
392 report_collection_point (void *cls,
393 const struct GNUNET_SCHEDULER_TaskContext* tc)
395 struct ReportingContext *rc = cls;
396 struct SensorInfo *sensor = rc->sensor;
397 struct CadetChannelContext *cc;
398 struct GNUNET_SENSOR_ReadingMessage *msg;
401 rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
402 if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
404 LOG (GNUNET_ERROR_TYPE_WARNING,
405 "Did not receive a value from `%s' to report yet.\n",
407 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
408 &report_collection_point, rc);
411 LOG (GNUNET_ERROR_TYPE_DEBUG,
412 "Now trying to report last seen value of `%s' "
413 "to collection point.\n",
415 GNUNET_assert (NULL != sensor->collection_point);
416 cc = get_cadet_channel (*sensor->collection_point);
417 if (GNUNET_YES == cc->sending)
419 LOG (GNUNET_ERROR_TYPE_DEBUG,
420 "Cadet channel to collection point busy, "
421 "trying again for sensor `%s' after %d seconds.\n",
424 rc->cp_task = GNUNET_SCHEDULER_add_delayed (
425 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, COLLECTION_RETRY),
426 &report_collection_point, rc);
429 msg_size = construct_reading_message (rc, &msg);
430 cc->sending = GNUNET_YES;
431 cc->pending_msg = msg;
432 cc->pending_msg_size = msg_size;
433 cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
435 sensor->collection_interval,
437 &do_report_collection_point,
439 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
440 &report_collection_point, rc);
445 * Sensor value watch callback
448 sensor_watch_cb (void *cls,
449 struct GNUNET_PEERSTORE_Record *record,
452 struct ReportingContext *rc = cls;
456 if (NULL != rc->last_value)
458 GNUNET_free (rc->last_value);
459 rc->last_value_size = 0;
461 rc->last_value = GNUNET_malloc(record->value_size);
462 memcpy (rc->last_value, record->value, record->value_size);
463 rc->last_value_size = record->value_size;
464 rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
465 LOG (GNUNET_ERROR_TYPE_DEBUG,
466 "Received a sensor `%s' watch value at "
467 "timestamp %" PRIu64 ", updating notification last_value.\n",
475 * Iterator for defined sensors
476 * Watches sensors for readings to report
480 * @param value a 'struct SensorInfo *' with sensor information
481 * @return #GNUNET_YES to continue iterations
484 init_sensor_reporting (void *cls,
485 const struct GNUNET_HashCode *key,
488 struct SensorInfo *sensor = value;
489 struct ReportingContext *rc;
491 if (NULL == sensor->collection_point &&
492 GNUNET_NO == sensor->p2p_report)
494 rc = GNUNET_new (struct ReportingContext);
496 rc->last_value = NULL;
497 rc->last_value_size = 0;
498 rc->wc = GNUNET_PEERSTORE_watch(peerstore,
504 if (NULL != sensor->collection_point)
506 LOG (GNUNET_ERROR_TYPE_INFO,
507 "Will start reporting sensor `%s' values to "
508 "collection point `%s' every %s.\n",
509 sensor->name, GNUNET_i2s_full(sensor->collection_point),
510 GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
513 GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
514 &report_collection_point,
517 if (GNUNET_YES == sensor->p2p_report)
519 LOG (GNUNET_ERROR_TYPE_INFO,
520 "Will start reporting sensor `%s' values to p2p network every %s.\n",
522 GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
525 GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
531 * Function called whenever a channel is destroyed. Should clean up
532 * any associated state.
534 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
536 * @param cls closure (set from #GNUNET_CADET_connect)
537 * @param channel connection to the other end (henceforth invalid)
538 * @param channel_ctx place where local state associated
539 * with the channel is stored
542 cadet_channel_destroyed (void *cls,
543 const struct GNUNET_CADET_Channel *channel,
546 struct CadetChannelContext *cc = channel_ctx;
548 if (GNUNET_YES == cc->destroying)
550 LOG (GNUNET_ERROR_TYPE_DEBUG,
551 "Received a `channel destroyed' notification from CADET, "
553 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
555 destroy_cadet_channel_context (cc);
559 * Start the sensor reporting module
561 * @param c our service configuration
562 * @param sensors multihashmap of loaded sensors
563 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
566 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
567 struct GNUNET_CONTAINER_MultiHashMap *sensors)
569 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
573 LOG (GNUNET_ERROR_TYPE_DEBUG,
574 "Starting sensor reporting module.\n");
575 GNUNET_assert(NULL != sensors);
577 peerstore = GNUNET_PEERSTORE_connect(cfg);
578 if (NULL == peerstore)
580 LOG (GNUNET_ERROR_TYPE_ERROR,
581 _("Failed to connect to peerstore service.\n"));
582 SENSOR_reporting_stop ();
583 return GNUNET_SYSERR;
585 cadet = GNUNET_CADET_connect(cfg,
588 &cadet_channel_destroyed,
593 LOG (GNUNET_ERROR_TYPE_ERROR,
594 _("Failed to connect to CADET service.\n"));
595 SENSOR_reporting_stop ();
596 return GNUNET_SYSERR;
598 GNUNET_CRYPTO_get_peer_identity (cfg,
600 GNUNET_CONTAINER_multihashmap_iterate(sensors,
601 &init_sensor_reporting, NULL);
605 /* end of gnunet-service-sensor-reporting.c */