2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor_reporting_value.c
23 * @brief sensor service value reporting functionality
24 * @author Omar Tarabai
28 #include "gnunet_util_lib.h"
30 #include "gnunet_peerstore_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting-value",__VA_ARGS__)
37 * Retry interval (seconds) in case channel to collection point is busy
39 #define COLLECTION_RETRY 1
42 * Context of reporting sensor values
44 struct ValueReportingContext
50 struct ValueReportingContext *prev;
55 struct ValueReportingContext *next;
60 struct GNUNET_SENSOR_SensorInfo *sensor;
63 * Collection point reporting task
64 * (or #GNUNET_SCHEDULER_NO_TASK)
66 GNUNET_SCHEDULER_TaskIdentifier cp_task;
69 * Watcher of sensor values
71 struct GNUNET_PEERSTORE_WatchContext *wc;
74 * Last value read from sensor
79 * Size of @e last_value
81 size_t last_value_size;
84 * Timestamp of last value reading
91 * Context of a created CADET channel
93 struct CadetChannelContext
99 struct CadetChannelContext *prev;
104 struct CadetChannelContext *next;
109 struct GNUNET_PeerIdentity pid;
112 * CADET channel handle
114 struct GNUNET_CADET_Channel *c;
117 * Are we sending data on this channel?
118 * #GNUNET_YES / #GNUNET_NO
123 * Pointer to a pending message to be sent over the channel
128 * Size of @e pending_msg
130 size_t pending_msg_size;
133 * Handle to CADET tranmission request in case we are sending
134 * (sending == #GNUNET_YES)
136 struct GNUNET_CADET_TransmitHandle *th;
139 * Are we currently destroying the channel and its context?
148 static const struct GNUNET_CONFIGURATION_Handle *cfg;
151 * Handle to peerstore service
153 static struct GNUNET_PEERSTORE_Handle *peerstore;
158 static struct GNUNET_PeerIdentity mypeerid;
161 * Handle to CADET service
163 static struct GNUNET_CADET_Handle *cadet;
166 * Head of DLL of all reporting contexts
168 struct ValueReportingContext *vrc_head;
171 * Tail of DLL of all reporting contexts
173 struct ValueReportingContext *vrc_tail;
176 * Head of DLL of all cadet channels
178 struct CadetChannelContext *cc_head;
181 * Tail of DLL of all cadet channels
183 struct CadetChannelContext *cc_tail;
186 * Destroy a reporting context structure
189 destroy_value_reporting_context (struct ValueReportingContext *vrc)
193 GNUNET_PEERSTORE_watch_cancel (vrc->wc);
196 if (GNUNET_SCHEDULER_NO_TASK != vrc->cp_task)
198 GNUNET_SCHEDULER_cancel (vrc->cp_task);
199 vrc->cp_task = GNUNET_SCHEDULER_NO_TASK;
201 if (NULL != vrc->last_value)
203 GNUNET_free (vrc->last_value);
204 vrc->last_value_size = 0;
211 * Destroy a CADET channel context struct
214 destroy_cadet_channel_context (struct CadetChannelContext *cc)
216 cc->destroying = GNUNET_YES;
219 GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
222 if (NULL != cc->pending_msg)
224 GNUNET_free (cc->pending_msg);
225 cc->pending_msg = NULL;
229 GNUNET_CADET_channel_destroy (cc->c);
237 * Stop sensor value reporting module
240 SENSOR_reporting_value_stop ()
242 struct ValueReportingContext *vrc;
243 struct CadetChannelContext *cc;
245 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor value reporting module.\n");
246 while (NULL != cc_head)
249 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
250 destroy_cadet_channel_context (cc);
252 while (NULL != vrc_head)
255 GNUNET_CONTAINER_DLL_remove (vrc_head, vrc_tail, vrc);
256 destroy_value_reporting_context (vrc);
258 if (NULL != peerstore)
260 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
265 GNUNET_CADET_disconnect (cadet);
272 * Returns CADET channel established to given peer or creates a new one.
274 * @param pid Peer Identity
275 * @return Context of established cadet channel
277 static struct CadetChannelContext *
278 get_cadet_channel (struct GNUNET_PeerIdentity pid)
280 struct CadetChannelContext *cc;
285 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
289 cc = GNUNET_new (struct CadetChannelContext);
291 GNUNET_CADET_channel_create (cadet, cc, &pid,
292 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
293 GNUNET_CADET_OPTION_DEFAULT);
295 cc->sending = GNUNET_NO;
296 cc->destroying = GNUNET_NO;
297 GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
303 * Construct a reading message ready to be sent over CADET channel
305 * @param rc reporting context to read data from
306 * @param msg used to return the created message structure
307 * @return size of created message
310 construct_reading_message (struct ValueReportingContext *vrc,
311 struct GNUNET_SENSOR_ReadingMessage **msg)
313 struct GNUNET_SENSOR_ReadingMessage *ret;
314 uint16_t sensorname_size;
318 sensorname_size = strlen (vrc->sensor->name) + 1;
320 sizeof (struct GNUNET_SENSOR_ReadingMessage) + sensorname_size +
321 vrc->last_value_size;
322 ret = GNUNET_malloc (total_size);
323 ret->header.size = htons (total_size);
324 ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
325 ret->sensorname_size = htons (sensorname_size);
326 ret->sensorversion_major = htons (vrc->sensor->version_major);
327 ret->sensorversion_minor = htons (vrc->sensor->version_minor);
328 ret->timestamp = GNUNET_htobe64 (vrc->timestamp);
329 ret->value_size = htons (vrc->last_value_size);
331 memcpy (dummy, vrc->sensor->name, sensorname_size);
332 dummy += sensorname_size;
333 memcpy (dummy, vrc->last_value, vrc->last_value_size);
340 * Function called to notify a client about the connection begin ready
341 * to queue more data. @a buf will be NULL and @a size zero if the
342 * connection was closed for writing in the meantime.
345 * @param size number of bytes available in @a buf
346 * @param buf where the callee should write the message
347 * @return number of bytes written to @a buf
350 do_report_value (void *cls, size_t size, void *buf)
352 struct CadetChannelContext *cc = cls;
356 cc->sending = GNUNET_NO;
357 LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
360 LOG (GNUNET_ERROR_TYPE_WARNING,
361 "CADET failed to transmit message (NULL buf), discarding.\n");
363 else if (size < cc->pending_msg_size)
365 LOG (GNUNET_ERROR_TYPE_WARNING,
366 "CADET failed to transmit message (small size, expected: %u, got: %u)"
367 ", discarding.\n", cc->pending_msg_size, size);
371 memcpy (buf, cc->pending_msg, cc->pending_msg_size);
372 written = cc->pending_msg_size;
374 GNUNET_free (cc->pending_msg);
375 cc->pending_msg = NULL;
376 cc->pending_msg_size = 0;
382 * Task scheduled to send values to collection point
384 * @param cls closure, a `struct ValueReportingContext *`
388 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
390 struct ValueReportingContext *vrc = cls;
391 struct GNUNET_SENSOR_SensorInfo *sensor = vrc->sensor;
392 struct CadetChannelContext *cc;
393 struct GNUNET_SENSOR_ReadingMessage *msg;
396 vrc->cp_task = GNUNET_SCHEDULER_NO_TASK;
397 if (0 == vrc->last_value_size) /* Did not receive a sensor value yet */
399 LOG (GNUNET_ERROR_TYPE_WARNING,
400 "Did not receive a value from `%s' to report yet.\n",
403 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
407 LOG (GNUNET_ERROR_TYPE_DEBUG,
408 "Now trying to report last seen value of `%s' " "to collection point.\n",
410 GNUNET_assert (NULL != sensor->collection_point);
411 cc = get_cadet_channel (*sensor->collection_point);
412 if (GNUNET_YES == cc->sending)
414 LOG (GNUNET_ERROR_TYPE_DEBUG,
415 "Cadet channel to collection point busy, "
416 "trying again for sensor `%s' after %d seconds.\n", vrc->sensor->name,
419 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
420 (GNUNET_TIME_UNIT_SECONDS,
421 COLLECTION_RETRY), &report_value, vrc);
424 msg_size = construct_reading_message (vrc, &msg);
425 cc->sending = GNUNET_YES;
426 cc->pending_msg = msg;
427 cc->pending_msg_size = msg_size;
429 GNUNET_CADET_notify_transmit_ready (cc->c, GNUNET_YES,
430 sensor->value_reporting_interval,
431 msg_size, &do_report_value, cc);
433 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
439 * Sensor value watch callback
442 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
444 struct ValueReportingContext *vrc = cls;
448 if (NULL != vrc->last_value)
450 GNUNET_free (vrc->last_value);
451 vrc->last_value_size = 0;
453 vrc->last_value = GNUNET_malloc (record->value_size);
454 memcpy (vrc->last_value, record->value, record->value_size);
455 vrc->last_value_size = record->value_size;
456 vrc->timestamp = GNUNET_TIME_absolute_get ().abs_value_us;
457 LOG (GNUNET_ERROR_TYPE_DEBUG,
458 "Received a sensor `%s' watch value at " "timestamp %" PRIu64
459 ", updating notification last_value.\n", vrc->sensor->name,
466 * Function called whenever a channel is destroyed. Should clean up
467 * any associated state.
469 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
471 * @param cls closure (set from #GNUNET_CADET_connect)
472 * @param channel connection to the other end (henceforth invalid)
473 * @param channel_ctx place where local state associated
474 * with the channel is stored
477 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
480 struct CadetChannelContext *cc = channel_ctx;
482 if (GNUNET_YES == cc->destroying)
484 LOG (GNUNET_ERROR_TYPE_DEBUG,
485 "Received a `channel destroyed' notification from CADET, "
487 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
489 destroy_cadet_channel_context (cc);
494 * Iterator for defined sensors
495 * Watches sensors for readings to report
499 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
500 * @return #GNUNET_YES to continue iterations
503 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
506 struct GNUNET_SENSOR_SensorInfo *sensor = value;
507 struct ValueReportingContext *vrc;
509 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
511 LOG (GNUNET_ERROR_TYPE_INFO,
512 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
513 sensor->name, GNUNET_i2s_full (sensor->collection_point),
514 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
516 vrc = GNUNET_new (struct ValueReportingContext);
517 vrc->sensor = sensor;
518 vrc->last_value = NULL;
519 vrc->last_value_size = 0;
521 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
522 &value_watch_cb, vrc);
524 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
526 GNUNET_CONTAINER_DLL_insert (vrc_head, vrc_tail, vrc);
532 * Start the sensor value reporting module
534 * @param c our service configuration
535 * @param sensors multihashmap of loaded sensors
536 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
539 SENSOR_reporting_value_start (const struct GNUNET_CONFIGURATION_Handle *c,
540 struct GNUNET_CONTAINER_MultiHashMap *sensors)
542 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
546 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor value reporting module.\n");
547 GNUNET_assert (NULL != sensors);
549 peerstore = GNUNET_PEERSTORE_connect (cfg);
550 if (NULL == peerstore)
552 LOG (GNUNET_ERROR_TYPE_ERROR,
553 _("Failed to connect to peerstore service.\n"));
554 SENSOR_reporting_value_stop ();
555 return GNUNET_SYSERR;
558 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
559 cadet_handlers, NULL);
562 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
563 SENSOR_reporting_value_stop ();
564 return GNUNET_SYSERR;
566 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
567 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
571 /* end of gnunet-service-sensor_reporting_value.c */