2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensordashboard/gnunet-service-sensordashboard.c
23 * @brief Service collecting sensor readings from peers
24 * @author Omar Tarabai
28 #include "gnunet_util_lib.h"
29 #include "gnunet_applications.h"
30 #include "sensordashboard.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_sensor_util_lib.h"
33 #include "gnunet_peerstore_service.h"
37 * Context of a connected client peer
39 struct ClientPeerContext
45 struct ClientPeerContext *prev;
50 struct ClientPeerContext *next;
53 * GNUnet Peer identity
55 struct GNUNET_PeerIdentity peerid;
58 * Handle to the cadet channel
60 struct GNUNET_CADET_Channel *ch;
63 * CADET transmit handle if we requested a transmission
65 struct GNUNET_CADET_TransmitHandle *th;
68 * Head of DLL of pending messages to be sent to client
70 struct PendingMessage *pm_head;
73 * Tail of DLL of pending messages to be sent to client
75 struct PendingMessage *pm_tail;
78 * Are we in the process of destroying this context?
85 * Message queued to be sent to a client stored in a DLL
93 struct PendingMessage *prev;
98 struct PendingMessage *next;
101 * Actual queued message
103 struct GNUNET_MessageHeader *msg;
108 * Carries a single reading from a sensor
110 struct ClientSensorReading
114 * Sensor this reading is related to
116 struct GNUNET_SENSOR_SensorInfo *sensor;
119 * Timestamp of taking the reading
137 * Global hashmap of defined sensors
139 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
142 * Handle to CADET service
144 static struct GNUNET_CADET_Handle *cadet;
147 * Handle to the peerstore service connection
149 static struct GNUNET_PEERSTORE_Handle *peerstore;
152 * Name of this subsystem to be used for peerstore operations
154 static char *subsystem = "sensordashboard";
157 * Head of a DLL of all connected client peers
159 static struct ClientPeerContext *cp_head;
162 * Tail of a DLL of all connected client peers
164 static struct ClientPeerContext *cp_tail;
168 * Trigger sending next pending message to the given client peer if any.
170 * @param cp client peer context struct
173 trigger_send_next_msg (struct ClientPeerContext *cp);
177 * Destroy a given client peer context
179 * @param cp client peer context
182 destroy_clientpeer (struct ClientPeerContext *cp)
184 struct PendingMessage *pm;
186 cp->destroying = GNUNET_YES;
189 GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
195 GNUNET_CONTAINER_DLL_remove (cp->pm_head, cp->pm_tail, pm);
196 GNUNET_free (pm->msg);
202 GNUNET_CADET_channel_destroy (cp->ch);
210 * Task run during shutdown.
216 cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
218 struct ClientPeerContext *cp;
223 GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp);
224 destroy_clientpeer (cp);
229 GNUNET_CADET_disconnect (cadet);
232 if (NULL != peerstore)
234 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
237 GNUNET_SENSOR_destroy_sensors (sensors);
238 GNUNET_SCHEDULER_shutdown ();
243 * Function called whenever a channel is destroyed. Should clean up
244 * any associated state.
246 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
248 * @param cls closure (set from #GNUNET_CADET_connect)
249 * @param channel connection to the other end (henceforth invalid)
250 * @param channel_ctx place where local state associated
251 * with the channel is stored
254 cadet_channel_destroyed (void *cls,
255 const struct GNUNET_CADET_Channel *channel,
258 struct ClientPeerContext *cp = channel_ctx;
260 if (GNUNET_YES == cp->destroying)
263 GNUNET_CONTAINER_DLL_remove (cp_head, cp_tail, cp);
264 destroy_clientpeer (cp);
269 * Method called whenever another peer has added us to a channel
270 * the other peer initiated.
271 * Only called (once) upon reception of data with a message type which was
272 * subscribed to in #GNUNET_CADET_connect.
274 * A call to #GNUNET_CADET_channel_destroy causes the channel to be ignored. In
275 * this case the handler MUST return NULL.
278 * @param channel new handle to the channel
279 * @param initiator peer that started the channel
280 * @param port Port this channel is for.
281 * @param options CadetOption flag field, with all active option bits set to 1.
283 * @return initial channel context for the channel
284 * (can be NULL -- that's not an error)
287 cadet_channel_created (void *cls,
288 struct GNUNET_CADET_Channel *channel,
289 const struct GNUNET_PeerIdentity *initiator,
290 uint32_t port, enum GNUNET_CADET_ChannelOption options)
292 struct ClientPeerContext *cp;
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "Received a channel connection from peer `%s'.\n",
296 GNUNET_i2s (initiator));
297 cp = GNUNET_new (struct ClientPeerContext);
298 cp->peerid = *initiator;
300 cp->destroying = GNUNET_NO;
301 GNUNET_CONTAINER_DLL_insert (cp_head, cp_tail, cp);
307 * Function called to notify a client about the connection begin ready
308 * to queue more data. @a buf will be NULL and @a size zero if the
309 * connection was closed for writing in the meantime.
311 * Perform the actual sending of the message to client peer.
313 * @param cls closure, a `struct ClientPeerContext *`
314 * @param size number of bytes available in @a buf
315 * @param buf where the callee should write the message
316 * @return number of bytes written to @a buf
319 do_send_msg (void *cls, size_t size, void *buf)
321 struct ClientPeerContext *cp = cls;
322 struct PendingMessage *pm;
327 msg_size = ntohs (pm->msg->size);
328 GNUNET_CONTAINER_DLL_remove (cp->pm_head, cp->pm_tail, pm);
329 if (NULL == buf || size < msg_size)
331 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
332 _("Error trying to send a message to peer `%s'.\n"),
333 GNUNET_i2s (&cp->peerid));
336 memcpy (buf, pm->msg, msg_size);
337 GNUNET_free (pm->msg);
339 trigger_send_next_msg (cp);
345 * Trigger sending next pending message to the given client peer if any.
347 * @param cp client peer context struct
350 trigger_send_next_msg (struct ClientPeerContext *cp)
352 struct PendingMessage *pm;
354 if (NULL == cp->pm_head)
359 cp->th = GNUNET_CADET_notify_transmit_ready (cp->ch,
361 GNUNET_TIME_UNIT_FOREVER_REL,
362 ntohs (pm->msg->size),
369 * Add a new message to the queue to be sent to the given client peer.
371 * @param msg Message to be queued
372 * @param cp Client peer context
375 queue_msg (struct GNUNET_MessageHeader *msg, struct ClientPeerContext *cp)
377 struct PendingMessage *pm;
379 pm = GNUNET_new (struct PendingMessage);
381 GNUNET_CONTAINER_DLL_insert_tail (cp->pm_head, cp->pm_tail, pm);
382 trigger_send_next_msg (cp);
387 * Iterate over defined sensors, creates and sends brief sensor information to
388 * given client peer over CADET.
390 * @param cls closure, the client peer
391 * @param key sensor key
392 * @param value sensor value
393 * @return #GNUNET_YES to continue iteration
396 send_sensor_brief (void *cls,
397 const struct GNUNET_HashCode *key,
400 struct ClientPeerContext *cp = cls;
401 struct GNUNET_SENSOR_SensorInfo *sensor = value;
402 struct GNUNET_SENSOR_SensorBriefMessage *msg;
403 uint16_t sensorname_size;
406 /* Create message struct */
407 sensorname_size = strlen (sensor->name) + 1;
408 total_size = sizeof (struct GNUNET_SENSOR_SensorBriefMessage) +
410 msg = GNUNET_malloc (total_size);
411 msg->header.size = htons (total_size);
412 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_BRIEF);
413 msg->name_size = htons (sensorname_size);
414 msg->version_major = htons (sensor->version_major);
415 msg->version_minor = htons (sensor->version_minor);
416 memcpy (&msg[1], sensor->name, sensorname_size);
418 queue_msg ((struct GNUNET_MessageHeader *)msg, cp);
424 * Called with any sensor list request received.
426 * Each time the function must call #GNUNET_CADET_receive_done on the channel
427 * in order to receive the next message. This doesn't need to be immediate:
428 * can be delayed if some processing is done on the message.
430 * @param cls Closure (set from #GNUNET_CADET_connect).
431 * @param channel Connection to the other end.
432 * @param channel_ctx Place to store local state associated with the channel.
433 * @param message The actual message.
434 * @return #GNUNET_OK to keep the channel open,
435 * #GNUNET_SYSERR to close it (signal serious error).
438 handle_sensor_list_req (void *cls,
439 struct GNUNET_CADET_Channel *channel,
441 const struct GNUNET_MessageHeader *message)
443 struct ClientPeerContext *cp = *channel_ctx;
444 struct GNUNET_MessageHeader *end_msg;
446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
447 "Received a sensor list request from peer `%s'.\n",
448 GNUNET_i2s (&cp->peerid));
449 GNUNET_CONTAINER_multihashmap_iterate (sensors,
452 end_msg = GNUNET_new (struct GNUNET_MessageHeader);
453 end_msg->size = htons (sizeof (struct GNUNET_MessageHeader));
454 end_msg->type = htons (GNUNET_MESSAGE_TYPE_SENSOR_END);
455 queue_msg (end_msg, cp);
456 GNUNET_CADET_receive_done (channel);
462 * Parses a sensor reading message struct
464 * @param msg message header received
465 * @param sensors multihashmap of loaded sensors
466 * @return sensor reading struct or NULL if error
468 static struct ClientSensorReading *
469 parse_reading_message (const struct GNUNET_MessageHeader *msg,
470 struct GNUNET_CONTAINER_MultiHashMap *sensors)
473 struct GNUNET_SENSOR_ReadingMessage *rm;
474 uint16_t sensorname_size;
478 struct GNUNET_HashCode key;
479 struct GNUNET_SENSOR_SensorInfo *sensor;
480 struct ClientSensorReading *reading;
482 msg_size = ntohs (msg->size);
483 if (msg_size < sizeof (struct GNUNET_SENSOR_ReadingMessage))
485 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Invalid reading message size.\n");
488 rm = (struct GNUNET_SENSOR_ReadingMessage *)msg;
489 sensorname_size = ntohs (rm->sensorname_size);
490 value_size = ntohs (rm->value_size);
491 if ((sizeof (struct GNUNET_SENSOR_ReadingMessage)
492 + sensorname_size + value_size) != msg_size)
494 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Invalid reading message size.\n");
498 sensorname = GNUNET_malloc (sensorname_size);
499 memcpy (sensorname, dummy, sensorname_size);
500 GNUNET_CRYPTO_hash(sensorname, sensorname_size, &key);
501 GNUNET_free (sensorname);
502 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &key);
505 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
506 "Unknown sensor name in reading message.\n");
509 if ((sensor->version_minor != ntohs (rm->sensorversion_minor)) ||
510 (sensor->version_major != ntohs (rm->sensorversion_major)))
512 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
513 "Sensor version mismatch in reading message.\n");
516 if (0 == strcmp (sensor->expected_datatype, "numeric") &&
517 sizeof (double) != value_size)
519 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
520 "Invalid value size for a numerical sensor.\n");
523 reading = GNUNET_new (struct ClientSensorReading);
524 reading->sensor = sensor;
525 reading->timestamp = GNUNET_be64toh (rm->timestamp);
526 reading->value_size = value_size;
527 reading->value = GNUNET_malloc (value_size);
528 dummy += sensorname_size;
529 memcpy (reading->value, dummy, value_size);
535 * Called with any sensor reading messages received from CADET.
537 * Each time the function must call #GNUNET_CADET_receive_done on the channel
538 * in order to receive the next message. This doesn't need to be immediate:
539 * can be delayed if some processing is done on the message.
541 * @param cls Closure (set from #GNUNET_CADET_connect).
542 * @param channel Connection to the other end.
543 * @param channel_ctx Place to store local state associated with the channel.
544 * @param message The actual message.
545 * @return #GNUNET_OK to keep the channel open,
546 * #GNUNET_SYSERR to close it (signal serious error).
549 handle_sensor_reading (void *cls,
550 struct GNUNET_CADET_Channel *channel,
552 const struct GNUNET_MessageHeader *message)
554 struct ClientPeerContext *cp = *channel_ctx;
555 struct ClientSensorReading *reading;
557 reading = parse_reading_message (message, sensors);
560 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
561 "Received an invalid sensor reading from peer `%s'\n",
562 GNUNET_i2s (&cp->peerid));
563 return GNUNET_SYSERR;
565 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
566 "Received a sensor reading from peer `%s':\n"
567 "# Sensor name: `%s'\n"
568 "# Timestamp: %" PRIu64 "\n"
569 "# Value size: %" PRIu64 ".\n",
570 GNUNET_i2s (&cp->peerid),
571 reading->sensor->name,
573 reading->value_size);
574 GNUNET_PEERSTORE_store (peerstore, subsystem, &cp->peerid,
575 reading->sensor->name, reading->value,
576 reading->value_size, GNUNET_TIME_UNIT_FOREVER_ABS,
577 GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, NULL, NULL);
578 GNUNET_free (reading->value);
579 GNUNET_free (reading);
580 GNUNET_CADET_receive_done (channel);
586 * Process sensordashboard requests.
589 * @param server the initialized server
590 * @param cfg configuration to use
593 run (void *cls, struct GNUNET_SERVER_Handle *server,
594 const struct GNUNET_CONFIGURATION_Handle *cfg)
596 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
599 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
600 {&handle_sensor_reading,
601 GNUNET_MESSAGE_TYPE_SENSOR_READING, 0},
602 {&handle_sensor_list_req,
603 GNUNET_MESSAGE_TYPE_SENSOR_LIST_REQ,
604 sizeof (struct GNUNET_MessageHeader)},
607 static uint32_t cadet_ports[] = {
608 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
609 GNUNET_APPLICATION_TYPE_SENSORUPDATE,
610 GNUNET_APPLICATION_TYPE_END
612 sensors = GNUNET_SENSOR_load_all_sensors ();
613 GNUNET_assert (NULL != sensors);
614 cadet = GNUNET_CADET_connect(cfg,
616 &cadet_channel_created,
617 &cadet_channel_destroyed,
622 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
623 _("Failed to connect to `%s' service.\n"), "CADET");
624 GNUNET_SCHEDULER_add_now (&cleanup_task, NULL);
627 peerstore = GNUNET_PEERSTORE_connect (cfg);
628 if (NULL == peerstore)
630 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
631 _("Failed to connect to `%s' service.\n"), "PEERSTORE");
632 GNUNET_SCHEDULER_add_now (&cleanup_task, NULL);
635 GNUNET_SERVER_add_handlers (server, handlers);
636 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
642 * The main function for the sensordashboard service.
644 * @param argc number of arguments from the command line
645 * @param argv command line arguments
646 * @return 0 ok, 1 on error
649 main (int argc, char *const *argv)
652 GNUNET_SERVICE_run (argc, argv, "sensordashboard",
653 GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1;
656 /* end of gnunet-service-sensordashboard.c */