2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor-reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
28 #include "gnunet_util_lib.h"
30 #include "gnunet_peerstore_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
37 * Retry interval (seconds) in case channel to collection point is busy
39 #define COLLECTION_RETRY 1
42 * Context of reporting operations
44 struct ReportingContext
50 struct ReportingContext *prev;
55 struct ReportingContext *next;
60 struct SensorInfo *sensor;
63 * Collection point reporting task
64 * (OR GNUNET_SCHEDULER_NO_TASK)
66 GNUNET_SCHEDULER_TaskIdentifier cp_task;
69 * Watcher of sensor values
71 struct GNUNET_PEERSTORE_WatchContext *wc;
74 * Last value read from sensor
81 size_t last_value_size;
84 * Timestamp of last value reading
91 * Context of a created CADET channel
93 struct CadetChannelContext
99 struct CadetChannelContext *prev;
104 struct CadetChannelContext *next;
109 struct GNUNET_PeerIdentity pid;
112 * CADET channel handle
114 struct GNUNET_CADET_Channel *c;
117 * Are we sending data on this channel?
118 * #GNUNET_YES / #GNUNET_NO
123 * Pointer to a pending message to be sent over the channel
128 * Size of @pending_msg
130 size_t pending_msg_size;
133 * Handle to CADET tranmission request in case we are sending
134 * (sending == GNUNET_YES)
136 struct GNUNET_CADET_TransmitHandle *th;
139 * Are we currently destroying the channel and its context?
148 static const struct GNUNET_CONFIGURATION_Handle *cfg;
151 * Handle to peerstore service
153 static struct GNUNET_PEERSTORE_Handle *peerstore;
158 static struct GNUNET_PeerIdentity mypeerid;
161 * Handle to CADET service
163 static struct GNUNET_CADET_Handle *cadet;
166 * Head of DLL of all reporting contexts
168 struct ReportingContext *rc_head;
171 * Tail of DLL of all reporting contexts
173 struct ReportingContext *rc_tail;
176 * Head of DLL of all cadet channels
178 struct CadetChannelContext *cc_head;
181 * Tail of DLL of all cadet channels
183 struct CadetChannelContext *cc_tail;
187 * Destroy a reporting context structure
190 destroy_reporting_context (struct ReportingContext *rc)
194 GNUNET_PEERSTORE_watch_cancel (rc->wc);
197 if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
199 GNUNET_SCHEDULER_cancel(rc->cp_task);
200 rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
202 if (NULL != rc->last_value)
204 GNUNET_free (rc->last_value);
205 rc->last_value_size = 0;
211 * Destroy a CADET channel context struct
214 destroy_cadet_channel_context (struct CadetChannelContext *cc)
216 cc->destroying = GNUNET_YES;
219 GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
222 if (NULL != cc->pending_msg)
224 GNUNET_free (cc->pending_msg);
225 cc->pending_msg = NULL;
229 GNUNET_CADET_channel_destroy (cc->c);
236 * Stop sensor reporting module
238 void SENSOR_reporting_stop ()
240 struct ReportingContext *rc;
241 struct CadetChannelContext *cc;
243 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
244 while (NULL != cc_head)
247 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
248 destroy_cadet_channel_context (cc);
250 while (NULL != rc_head)
253 GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
254 destroy_reporting_context (rc);
256 if (NULL != peerstore)
258 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
263 GNUNET_CADET_disconnect (cadet);
269 * Returns CADET channel established to given peer
270 * or creates a new one
272 * @param pid Peer Identity
273 * @return Context of established cadet channel
275 static struct CadetChannelContext *
276 get_cadet_channel (struct GNUNET_PeerIdentity pid)
278 struct CadetChannelContext *cc;
283 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
287 cc = GNUNET_new (struct CadetChannelContext);
288 cc->c = GNUNET_CADET_channel_create(cadet,
291 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
292 GNUNET_CADET_OPTION_DEFAULT);
294 cc->sending = GNUNET_NO;
295 cc->destroying = GNUNET_NO;
296 GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
301 * Construct a reading message ready to be sent over CADET channel
303 * @param rc reporting context to read data from
304 * @param msg used to return the created message structure
305 * @return size of created message
308 construct_reading_message (struct ReportingContext *rc,
309 struct GNUNET_SENSOR_ReadingMessage **msg)
311 struct GNUNET_SENSOR_ReadingMessage *ret;
312 uint16_t sensorname_size;
316 sensorname_size = strlen (rc->sensor->name) + 1;
317 total_size = sizeof(struct GNUNET_SENSOR_ReadingMessage) +
320 ret = GNUNET_malloc (total_size);
321 ret->header.size = htons (total_size);
322 ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
323 ret->sensorname_size = htons (sensorname_size);
324 ret->sensorversion_major = htons (rc->sensor->version_major);
325 ret->sensorversion_minor = htons (rc->sensor->version_minor);
326 ret->timestamp = GNUNET_htobe64 (rc->timestamp);
327 ret->value_size = htons (rc->last_value_size);
329 memcpy (dummy, rc->sensor->name, sensorname_size);
330 dummy += sensorname_size;
331 memcpy (dummy, rc->last_value, rc->last_value_size);
337 * Function called to notify a client about the connection begin ready
338 * to queue more data. @a buf will be NULL and @a size zero if the
339 * connection was closed for writing in the meantime.
342 * @param size number of bytes available in @a buf
343 * @param buf where the callee should write the message
344 * @return number of bytes written to @a buf
347 do_report_collection_point (void *cls, size_t size, void *buf)
349 struct CadetChannelContext *cc = cls;
353 cc->sending = GNUNET_NO;
354 LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
357 LOG (GNUNET_ERROR_TYPE_WARNING,
358 "CADET failed to transmit message (NULL buf), discarding.\n");
360 else if (size < cc->pending_msg_size)
362 LOG (GNUNET_ERROR_TYPE_WARNING,
363 "CADET failed to transmit message (small size, expected: %u, got: %u)"
364 ", discarding.\n", cc->pending_msg_size, size);
368 memcpy (buf, cc->pending_msg, cc->pending_msg_size);
369 written = cc->pending_msg_size;
371 GNUNET_free (cc->pending_msg);
372 cc->pending_msg = NULL;
373 cc->pending_msg_size = 0;
378 * Task scheduled to send values to collection point
380 * @param cls closure, a 'struct CollectionReportingContext *'
383 static void report_collection_point
384 (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
386 struct ReportingContext *rc = cls;
387 struct SensorInfo *sensor = rc->sensor;
388 struct CadetChannelContext *cc;
389 struct GNUNET_SENSOR_ReadingMessage *msg;
392 rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
393 if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
395 LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' "
396 "to report yet.\n", rc->sensor->name);
397 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
398 &report_collection_point, rc);
401 LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' "
402 "to collection point.\n", rc->sensor->name);
403 GNUNET_assert (NULL != sensor->collection_point);
404 cc = get_cadet_channel (*sensor->collection_point);
405 if (GNUNET_YES == cc->sending)
407 LOG (GNUNET_ERROR_TYPE_DEBUG,
408 "Cadet channel to collection point busy, "
409 "trying again for sensor `%s' after %d seconds.\n", rc->sensor->name,
411 rc->cp_task = GNUNET_SCHEDULER_add_delayed (
412 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, COLLECTION_RETRY),
413 &report_collection_point, rc);
416 msg_size = construct_reading_message (rc, &msg);
417 cc->sending = GNUNET_YES;
418 cc->pending_msg = msg;
419 cc->pending_msg_size = msg_size;
420 cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
422 sensor->collection_interval,
424 &do_report_collection_point,
426 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
427 &report_collection_point, rc);
431 * Sensor value watch callback
434 sensor_watch_cb (void *cls,
435 struct GNUNET_PEERSTORE_Record *record,
438 struct ReportingContext *rc = cls;
442 if (NULL != rc->last_value)
444 GNUNET_free (rc->last_value);
445 rc->last_value_size = 0;
447 rc->last_value = GNUNET_malloc(record->value_size);
448 memcpy (rc->last_value, record->value, record->value_size);
449 rc->last_value_size = record->value_size;
450 rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
451 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value at "
452 "timestamp %" PRIu64 ", updating notification last_value.\n",
453 rc->sensor->name, rc->timestamp);
458 * Iterator for defined sensors
459 * Watches sensors for readings to report
463 * @param value a 'struct SensorInfo *' with sensor information
464 * @return #GNUNET_YES to continue iterations
467 init_sensor_reporting (void *cls,
468 const struct GNUNET_HashCode *key,
471 struct SensorInfo *sensor = value;
472 struct ReportingContext *rc;
474 if (NULL == sensor->collection_point &&
475 GNUNET_NO == sensor->p2p_report)
477 rc = GNUNET_new (struct ReportingContext);
479 rc->last_value = NULL;
480 rc->last_value_size = 0;
481 rc->wc = GNUNET_PEERSTORE_watch(peerstore,
487 if (NULL != sensor->collection_point)
489 LOG (GNUNET_ERROR_TYPE_INFO,
490 "Will start reporting sensor `%s' values to "
491 "collection point `%s' every %s.\n",
492 sensor->name, GNUNET_i2s_full(sensor->collection_point),
493 GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
496 GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
497 &report_collection_point,
500 if (GNUNET_YES == sensor->p2p_report)
502 LOG (GNUNET_ERROR_TYPE_INFO,
503 "Will start reporting sensor `%s' values to p2p network every %s.\n",
505 GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
508 GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
513 * Function called whenever a channel is destroyed. Should clean up
514 * any associated state.
516 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
518 * @param cls closure (set from #GNUNET_CADET_connect)
519 * @param channel connection to the other end (henceforth invalid)
520 * @param channel_ctx place where local state associated
521 * with the channel is stored
523 static void cadet_channel_destroyed (void *cls,
524 const struct GNUNET_CADET_Channel *channel,
527 struct CadetChannelContext *cc = channel_ctx;
529 if (GNUNET_YES == cc->destroying)
531 LOG (GNUNET_ERROR_TYPE_DEBUG,
532 "Received a `channel destroyed' notification from CADET, "
534 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
536 destroy_cadet_channel_context (cc);
540 * Start the sensor reporting module
542 * @param c our service configuration
543 * @param sensors multihashmap of loaded sensors
544 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
547 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
548 struct GNUNET_CONTAINER_MultiHashMap *sensors)
550 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
554 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
555 GNUNET_assert(NULL != sensors);
557 peerstore = GNUNET_PEERSTORE_connect(cfg);
558 if (NULL == peerstore)
560 LOG (GNUNET_ERROR_TYPE_ERROR,
561 _("Failed to connect to peerstore service.\n"));
562 SENSOR_reporting_stop ();
563 return GNUNET_SYSERR;
565 cadet = GNUNET_CADET_connect(cfg,
568 &cadet_channel_destroyed,
573 LOG (GNUNET_ERROR_TYPE_ERROR,
574 _("Failed to connect to CADET service.\n"));
575 SENSOR_reporting_stop ();
576 return GNUNET_SYSERR;
578 GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
579 GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
584 /* end of gnunet-service-sensor-reporting.c */