2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor-reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
27 #include "gnunet_util_lib.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_cadet_service.h"
31 #include "gnunet_applications.h"
33 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
36 * Context of reporting operations
38 struct ReportingContext
44 struct ReportingContext *prev;
49 struct ReportingContext *next;
54 struct SensorInfo *sensor;
57 * Collection point reporting task
58 * (OR GNUNET_SCHEDULER_NO_TASK)
60 GNUNET_SCHEDULER_TaskIdentifier cp_task;
63 * Watcher of sensor values
65 struct GNUNET_PEERSTORE_WatchContext *wc;
68 * Last value read from sensor
75 size_t last_value_size;
78 * Timestamp of last value reading
85 * Context of a created CADET channel
87 struct CadetChannelContext
93 struct CadetChannelContext *prev;
98 struct CadetChannelContext *next;
103 struct GNUNET_PeerIdentity pid;
106 * CADET channel handle
108 struct GNUNET_CADET_Channel *c;
111 * Are we sending data on this channel?
112 * #GNUNET_YES / #GNUNET_NO
117 * Pointer to a pending message to be sent over the channel
122 * Size of @pending_msg
124 size_t pending_msg_size;
127 * Handle to CADET tranmission request in case we are sending
128 * (sending == GNUNET_YES)
130 struct GNUNET_CADET_TransmitHandle *th;
137 static const struct GNUNET_CONFIGURATION_Handle *cfg;
140 * Handle to peerstore service
142 static struct GNUNET_PEERSTORE_Handle *peerstore;
147 static struct GNUNET_PeerIdentity mypeerid;
150 * Handle to CADET service
152 static struct GNUNET_CADET_Handle *cadet;
155 * Head of DLL of all reporting contexts
157 struct ReportingContext *rc_head;
160 * Tail of DLL of all reporting contexts
162 struct ReportingContext *rc_tail;
165 * Head of DLL of all cadet channels
167 struct CadetChannelContext *cc_head;
170 * Tail of DLL of all cadet channels
172 struct CadetChannelContext *cc_tail;
176 * Destroy a reporting context structure
179 destroy_reporting_context (struct ReportingContext *rc)
183 GNUNET_PEERSTORE_watch_cancel (rc->wc);
186 if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
188 GNUNET_SCHEDULER_cancel(rc->cp_task);
189 rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
191 if (NULL != rc->last_value)
193 GNUNET_free (rc->last_value);
194 rc->last_value_size = 0;
200 * Destroy a CADET channel context struct
203 destroy_cadet_channel_context (struct CadetChannelContext *cc)
207 GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
210 if (NULL != cc->pending_msg)
212 GNUNET_free (cc->pending_msg);
213 cc->pending_msg = NULL;
217 GNUNET_CADET_channel_destroy (cc->c);
224 * Stop sensor reporting module
226 void SENSOR_reporting_stop ()
228 struct ReportingContext *rc;
229 struct CadetChannelContext *cc;
231 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
232 while (NULL != cc_head)
235 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
236 destroy_cadet_channel_context (cc);
238 while (NULL != rc_head)
241 GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
242 destroy_reporting_context (rc);
244 if (NULL != peerstore)
246 GNUNET_PEERSTORE_disconnect (peerstore);
251 GNUNET_CADET_disconnect (cadet);
257 * Returns CADET channel established to given peer
258 * or creates a new one
260 * @param pid Peer Identity
261 * @return Context of established cadet channel
263 static struct CadetChannelContext *
264 get_cadet_channel (struct GNUNET_PeerIdentity pid)
266 struct CadetChannelContext *cc;
271 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
275 cc = GNUNET_new (struct CadetChannelContext);
276 cc->c = GNUNET_CADET_channel_create(cadet,
279 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
280 GNUNET_CADET_OPTION_DEFAULT);
282 cc->sending = GNUNET_NO;
283 GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
288 * Construct a reading message ready to be sent over CADET channel
290 * @param rc reporting context to read data from
291 * @param msg used to return the created message structure
292 * @return size of created message
295 construct_reading_message (struct ReportingContext *rc,
296 struct GNUNET_SENSOR_Reading **msg)
298 struct GNUNET_SENSOR_Reading *ret;
299 size_t sensorname_size;
303 sensorname_size = strlen (rc->sensor->name) + 1;
304 total_size = sizeof(struct GNUNET_SENSOR_Reading) +
307 ret = GNUNET_malloc (total_size);
308 ret->header->size = htons (total_size);
309 ret->header->type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
310 ret->sensorname_size = GNUNET_htobe64 (sensorname_size);
311 ret->sensorversion_major = htons (rc->sensor->version_major);
312 ret->sensorversion_minor = htons (rc->sensor->version_minor);
313 ret->timestamp = GNUNET_htobe64 (rc->timestamp);
314 ret->value_size = GNUNET_htobe64 (rc->last_value_size);
316 memcpy (dummy, rc->sensor->name, sensorname_size);
317 dummy += sensorname_size;
318 memcpy (dummy, rc->last_value, rc->last_value_size);
324 * Function called to notify a client about the connection begin ready
325 * to queue more data. @a buf will be NULL and @a size zero if the
326 * connection was closed for writing in the meantime.
329 * @param size number of bytes available in @a buf
330 * @param buf where the callee should write the message
331 * @return number of bytes written to @a buf
334 do_report_collection_point (void *cls, size_t size, void *buf)
336 struct CadetChannelContext *cc = cls;
339 LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
340 cc->sending = GNUNET_NO;
341 if (NULL == buf || size != cc->pending_msg_size)
343 LOG (GNUNET_ERROR_TYPE_WARNING,
344 "CADET failed to transmit message to collection point, discarding.");
348 memcpy (buf, cc->pending_msg, cc->pending_msg_size);
349 written = cc->pending_msg_size;
351 GNUNET_free (cc->pending_msg);
352 cc->pending_msg_size = 0;
357 * Task scheduled to send values to collection point
359 * @param cls closure, a 'struct CollectionReportingContext *'
362 static void report_collection_point
363 (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
365 struct ReportingContext *rc = cls;
366 struct SensorInfo *sensor = rc->sensor;
367 struct CadetChannelContext *cc;
368 struct GNUNET_SENSOR_Reading *msg;
371 rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
372 if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
374 LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' "
375 "to report yet.\n", rc->sensor->name);
376 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
377 &report_collection_point, rc);
380 LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' "
381 "to collection point.\n", rc->sensor->name);
382 GNUNET_assert (NULL != sensor->collection_point);
383 cc = get_cadet_channel (*sensor->collection_point);
384 if (GNUNET_YES == cc->sending)
386 LOG (GNUNET_ERROR_TYPE_DEBUG,
387 "Cadet channel to collection point busy, "
388 "trying again for sensor `%s' on next interval.\n", rc->sensor->name);
389 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
390 &report_collection_point, rc);
393 msg_size = construct_reading_message (rc, &msg);
394 cc->sending = GNUNET_YES;
395 cc->pending_msg = msg;
396 cc->pending_msg_size = msg_size;
397 cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
399 sensor->collection_interval,
401 &do_report_collection_point,
403 rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
404 &report_collection_point, rc);
408 * Sensor value watch callback
411 sensor_watch_cb (void *cls,
412 struct GNUNET_PEERSTORE_Record *record,
415 struct ReportingContext *rc = cls;
417 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value, "
418 "updating notification last_value.\n", rc->sensor->name);
421 if (NULL != rc->last_value)
423 GNUNET_free (rc->last_value);
424 rc->last_value_size = 0;
426 rc->last_value = GNUNET_malloc(record->value_size);
427 memcpy (rc->last_value, record->value, record->value_size);
428 rc->last_value_size = record->value_size;
429 rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
434 * Iterator for defined sensors
435 * Watches sensors for readings to report
439 * @param value a 'struct SensorInfo *' with sensor information
440 * @return #GNUNET_YES to continue iterations
443 init_sensor_reporting (void *cls,
444 const struct GNUNET_HashCode *key,
447 struct SensorInfo *sensor = value;
448 struct ReportingContext *rc;
450 if (NULL == sensor->collection_point &&
451 GNUNET_NO == sensor->p2p_report)
453 rc = GNUNET_new (struct ReportingContext);
455 rc->last_value = NULL;
456 rc->last_value_size = 0;
457 rc->wc = GNUNET_PEERSTORE_watch(peerstore,
463 if (NULL != sensor->collection_point)
465 LOG (GNUNET_ERROR_TYPE_INFO,
466 "Will start reporting sensor `%s' values to "
467 "collection point `%s' every %s.\n",
468 sensor->name, GNUNET_i2s_full(sensor->collection_point),
469 GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
472 GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
473 &report_collection_point,
476 if (GNUNET_YES == sensor->p2p_report)
478 LOG (GNUNET_ERROR_TYPE_INFO,
479 "Will start reporting sensor `%s' values to p2p network every %s.\n",
481 GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
484 GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
489 * Function called whenever a channel is destroyed. Should clean up
490 * any associated state.
492 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
494 * @param cls closure (set from #GNUNET_CADET_connect)
495 * @param channel connection to the other end (henceforth invalid)
496 * @param channel_ctx place where local state associated
497 * with the channel is stored
499 static void cadet_channel_destroyed (void *cls,
500 const struct GNUNET_CADET_Channel *channel,
503 struct CadetChannelContext *cc = channel_ctx;
505 LOG (GNUNET_ERROR_TYPE_DEBUG,
506 "Received a `channel destroyed' notification from CADET, "
508 GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
510 destroy_cadet_channel_context (cc);
514 * Start the sensor reporting module
516 * @param c our service configuration
517 * @param sensors multihashmap of loaded sensors
518 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
521 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
522 struct GNUNET_CONTAINER_MultiHashMap *sensors)
524 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
528 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
529 GNUNET_assert(NULL != sensors);
531 peerstore = GNUNET_PEERSTORE_connect(cfg);
532 if (NULL == peerstore)
534 LOG (GNUNET_ERROR_TYPE_ERROR,
535 _("Failed to connect to peerstore service.\n"));
536 SENSOR_reporting_stop ();
537 return GNUNET_SYSERR;
539 cadet = GNUNET_CADET_connect(cfg,
542 &cadet_channel_destroyed,
547 LOG (GNUNET_ERROR_TYPE_ERROR,
548 _("Failed to connect to CADET service.\n"));
549 SENSOR_reporting_stop ();
550 return GNUNET_SYSERR;
552 GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
553 GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
558 /* end of gnunet-service-sensor-reporting.c */