2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor_reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
27 #include "gnunet_util_lib.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
43 struct AnomalyInfo *prev;
48 struct AnomalyInfo *next;
53 struct GNUNET_SENSOR_SensorInfo *sensor;
56 * Current anomalous status of sensor
61 * List of peers that reported an anomaly for this sensor
63 struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
73 struct ValueInfo *prev;
78 struct ValueInfo *next;
83 struct GNUNET_SENSOR_SensorInfo *sensor;
86 * Last value read from sensor
91 * Size of @e last_value
93 size_t last_value_size;
96 * Timestamp of last value reading
98 struct GNUNET_TIME_Absolute last_value_timestamp;
101 * Has the last value seen already been reported to collection point?
103 int last_value_reported;
106 * Watcher of sensor values
108 struct GNUNET_PEERSTORE_WatchContext *wc;
111 * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
113 GNUNET_SCHEDULER_TaskIdentifier reporting_task;
118 * Information about a connected CORE peer.
119 * Note that we only know about a connected peer if it is running the same
120 * application (sensor anomaly reporting) as us.
128 struct CorePeer *prev;
133 struct CorePeer *next;
136 * Peer identity of connected peer
138 struct GNUNET_PeerIdentity *peer_id;
141 * Message queue for messages to be sent to this peer
143 struct GNUNET_MQ_Handle *mq;
148 * Information about a connected CADET peer (collection point).
156 struct CadetPeer *prev;
161 struct CadetPeer *next;
166 struct GNUNET_PeerIdentity peer_id;
169 * CADET channel handle
171 struct GNUNET_CADET_Channel *channel;
174 * Message queue for messages to be sent to this peer
176 struct GNUNET_MQ_Handle *mq;
179 * Are we currently destroying the channel and its context?
189 static const struct GNUNET_CONFIGURATION_Handle *cfg;
192 * Multihashmap of loaded sensors
194 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
197 * Handle to peerstore service
199 static struct GNUNET_PEERSTORE_Handle *peerstore;
202 * Handle to core service
204 static struct GNUNET_CORE_Handle *core;
207 * Handle to CADET service
209 static struct GNUNET_CADET_Handle *cadet;
214 static struct GNUNET_PeerIdentity mypeerid;
217 * Head of DLL of anomaly info structs
219 static struct AnomalyInfo *ai_head;
222 * Tail of DLL of anomaly info structs
224 static struct AnomalyInfo *ai_tail;
227 * Head of DLL of value info structs
229 static struct ValueInfo *vi_head;
232 * Tail of DLL of value info structs
234 static struct ValueInfo *vi_tail;
237 * Head of DLL of CORE peers
239 static struct CorePeer *corep_head;
242 * Tail of DLL of CORE peers
244 static struct CorePeer *corep_tail;
247 * Head of DLL of CADET peers
249 static struct CadetPeer *cadetp_head;
252 * Tail of DLL of CADET peers
254 static struct CadetPeer *cadetp_tail;
257 * Is the module started?
259 static int module_running = GNUNET_NO;
262 * Number of known neighborhood peers
264 static int neighborhood;
268 /******************************************************************************/
269 /****************************** CLEANUP ******************************/
270 /******************************************************************************/
273 * Destroy anomaly info struct
275 * @param ai struct to destroy
278 destroy_anomaly_info (struct AnomalyInfo *ai)
280 if (NULL != ai->anomalous_neighbors)
281 GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
287 * Destroy value info struct
289 * @param vi struct to destroy
292 destroy_value_info (struct ValueInfo *vi)
296 GNUNET_PEERSTORE_watch_cancel (vi->wc);
299 if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
301 GNUNET_SCHEDULER_cancel (vi->reporting_task);
302 vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
304 if (NULL != vi->last_value)
306 GNUNET_free (vi->last_value);
307 vi->last_value = NULL;
314 * Destroy core peer struct
316 * @param corep struct to destroy
319 destroy_core_peer (struct CorePeer *corep)
321 struct AnomalyInfo *ai;
323 if (NULL != corep->mq)
325 GNUNET_MQ_destroy (corep->mq);
331 GNUNET_assert (NULL != ai->anomalous_neighbors);
332 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
341 * Destroy cadet peer struct
343 * @param cadetp struct to destroy
346 destroy_cadet_peer (struct CadetPeer *cadetp)
348 cadetp->destroying = GNUNET_YES;
349 if (NULL != cadetp->mq)
351 GNUNET_MQ_destroy (cadetp->mq);
354 if (NULL != cadetp->channel)
356 GNUNET_CADET_channel_destroy (cadetp->channel);
357 cadetp->channel = NULL;
359 GNUNET_free (cadetp);
364 * Stop sensor reporting module
367 SENSOR_reporting_stop ()
369 struct ValueInfo *vi;
370 struct CorePeer *corep;
371 struct AnomalyInfo *ai;
372 struct CadetPeer *cadetp;
374 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
375 module_running = GNUNET_NO;
377 /* Destroy value info's */
381 GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
382 destroy_value_info (vi);
385 /* Destroy core peers */
387 while (NULL != corep)
389 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
390 destroy_core_peer (corep);
393 /* Destroy anomaly info's */
397 GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
398 destroy_anomaly_info (ai);
401 /* Destroy cadet peers */
402 cadetp = cadetp_head;
403 while (NULL != cadetp)
405 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
406 destroy_cadet_peer (cadetp);
407 cadetp = cadetp_head;
409 /* Disconnect from other services */
412 GNUNET_CORE_disconnect (core);
415 if (NULL != peerstore)
417 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
422 GNUNET_CADET_disconnect (cadet);
428 /******************************************************************************/
429 /****************************** HELPERS ******************************/
430 /******************************************************************************/
434 * Gets the anomaly info struct related to the given sensor
436 * @param sensor Sensor to search by
438 static struct AnomalyInfo *
439 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
441 struct AnomalyInfo *ai;
446 if (ai->sensor == sensor)
457 * Returns context of a connected CADET peer.
458 * Creates it first if didn't exist before.
460 * @param pid Peer Identity
461 * @return Context of connected CADET peer
463 static struct CadetPeer *
464 get_cadet_peer (struct GNUNET_PeerIdentity pid)
466 struct CadetPeer *cadetp;
468 cadetp = cadetp_head;
469 while (NULL != cadetp)
471 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
473 cadetp = cadetp->next;
475 /* Not found, create struct and channel */
476 cadetp = GNUNET_new (struct CadetPeer);
477 cadetp->peer_id = pid;
479 GNUNET_CADET_channel_create (cadet, cadetp, &pid,
480 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
481 GNUNET_CADET_OPTION_DEFAULT);
482 cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
483 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
489 * Create an anomaly report message from a given anomaly info struct inside a
492 * @param ai Anomaly info struct to use
493 * @return Envelope with message
495 static struct GNUNET_MQ_Envelope *
496 create_anomaly_report_message (struct AnomalyInfo *ai)
498 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
499 struct GNUNET_MQ_Envelope *ev;
501 ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
502 GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
503 &arm->sensorname_hash);
504 arm->sensorversion_major = htons (ai->sensor->version_major);
505 arm->sensorversion_minor = htons (ai->sensor->version_minor);
506 arm->anomalous = htons (ai->anomalous);
507 arm->anomalous_neighbors =
508 ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
515 * Create a sensor value message from a given value info struct inside a MQ
518 * @param vi Value info struct to use
519 * @return Envelope with message
521 static struct GNUNET_MQ_Envelope *
522 create_value_message (struct ValueInfo *vi)
524 struct GNUNET_SENSOR_ValueMessage *vm;
525 struct GNUNET_MQ_Envelope *ev;
527 ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING);
528 GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
529 &vm->sensorname_hash);
530 vm->sensorversion_major = htons (vi->sensor->version_major);
531 vm->sensorversion_minor = htons (vi->sensor->version_minor);
532 vm->timestamp = vi->last_value_timestamp;
533 vm->value_size = htons (vi->last_value_size);
534 memcpy (&vm[1], vi->last_value, vi->last_value_size);
540 * Send given anomaly info report by putting it in the given message queue.
542 * @param mq Message queue to put the message in
543 * @param ai Anomaly info to report
546 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai)
548 struct GNUNET_MQ_Envelope *ev;
550 ev = create_anomaly_report_message (ai);
551 GNUNET_MQ_send (mq, ev);
555 /******************************************************************************/
556 /*************************** CORE Handlers ***************************/
557 /******************************************************************************/
561 * An inbound anomaly report is received from a peer through CORE.
563 * @param cls closure (unused)
564 * @param peer the other peer involved
565 * @param message the actual message
566 * @return #GNUNET_OK to keep the connection open,
567 * #GNUNET_SYSERR to close connection to the peer (signal serious error)
570 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
571 const struct GNUNET_MessageHeader *message)
573 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
574 struct GNUNET_SENSOR_SensorInfo *sensor;
575 struct AnomalyInfo *ai;
576 struct CadetPeer *cadetp;
579 arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
580 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
581 if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
582 sensor->version_minor != arm->sensorversion_minor)
584 LOG (GNUNET_ERROR_TYPE_WARNING,
585 "I don't have the sensor reported by the peer `%s'.\n",
589 ai = get_anomaly_info_by_sensor (sensor);
590 GNUNET_assert (NULL != ai);
592 GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
593 if (GNUNET_YES == ai->anomalous)
595 if (GNUNET_YES == peer_in_list)
598 GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
599 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
603 if (GNUNET_NO == peer_in_list)
606 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
608 /* Send anomaly update to collection point */
609 if (NULL != ai->sensor->collection_point &&
610 GNUNET_YES == ai->sensor->report_anomalies)
612 cadetp = get_cadet_peer (*ai->sensor->collection_point);
613 send_anomaly_report (cadetp->mq, ai);
619 /******************************************************************************/
620 /************************ PEERSTORE callbacks ************************/
621 /******************************************************************************/
625 * Sensor value watch callback
627 * @param cls Closure, ValueInfo struct related to the sensor we are watching
628 * @param record PEERSTORE new record, NULL if error
629 * @param emsg Error message, NULL if no error
630 * @return GNUNET_YES to continue watching
633 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
635 struct ValueInfo *vi = cls;
639 LOG (GNUNET_ERROR_TYPE_ERROR,
640 _("PEERSTORE error: %s.\n"), emsg);
643 if (NULL != vi->last_value)
645 GNUNET_free (vi->last_value);
646 vi->last_value_size = 0;
648 vi->last_value = GNUNET_memdup (record->value, record->value_size);
649 vi->last_value_size = record->value_size;
650 vi->last_value_timestamp = GNUNET_TIME_absolute_get();
651 vi->last_value_reported = GNUNET_NO;
656 /******************************************************************************/
657 /************************** CORE callbacks ***************************/
658 /******************************************************************************/
662 * Method called whenever a CORE peer disconnects.
664 * @param cls closure (unused)
665 * @param peer peer identity this notification is about
668 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
670 struct CorePeer *corep;
672 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
676 while (NULL != corep)
678 if (peer == corep->peer_id)
680 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
681 destroy_core_peer (corep);
686 LOG (GNUNET_ERROR_TYPE_ERROR,
687 _("Received disconnect notification from CORE"
688 " for a peer we didn't know about.\n"));
693 * Method called whenever a given peer connects through CORE.
695 * @param cls closure (unused)
696 * @param peer peer identity this notification is about
699 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
701 struct CorePeer *corep;
702 struct AnomalyInfo *ai;
704 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
707 corep = GNUNET_new (struct CorePeer);
708 corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
709 corep->mq = GNUNET_CORE_mq_create (core, peer);
710 GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
711 /* Send any locally anomalous sensors to the new peer */
715 if (GNUNET_YES == ai->anomalous)
716 send_anomaly_report (corep->mq, ai);
723 * Function called after #GNUNET_CORE_connect has succeeded (or failed
724 * for good). Note that the private key of the peer is intentionally
725 * not exposed here; if you need it, your process should try to read
726 * the private key file directly (which should work if you are
727 * authorized...). Implementations of this function must not call
728 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
731 * @param cls closure (unused)
732 * @param my_identity ID of this peer, NULL if we failed
735 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
737 if (NULL == my_identity)
739 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
740 SENSOR_reporting_stop ();
743 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
745 LOG (GNUNET_ERROR_TYPE_ERROR,
746 _("Peer identity received from CORE init doesn't match ours.\n"));
747 SENSOR_reporting_stop ();
753 /******************************************************************************/
754 /************************* CADET callbacks ***************************/
755 /******************************************************************************/
758 * Function called whenever a channel is destroyed. Should clean up
759 * any associated state.
761 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
763 * @param cls closure (set from #GNUNET_CADET_connect)
764 * @param channel connection to the other end (henceforth invalid)
765 * @param channel_ctx place where local state associated
766 * with the channel is stored
769 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
772 struct CadetPeer *cadetp = channel_ctx;
774 if (GNUNET_YES == cadetp->destroying)
776 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
777 cadetp->channel = NULL;
778 destroy_cadet_peer (cadetp);
782 /******************************************************************************/
783 /********************** Local anomaly receiver ***********************/
784 /******************************************************************************/
788 * Used by the analysis module to tell the reporting module about a change in
789 * the anomaly status of a sensor.
791 * @param sensor Related sensor
792 * @param anomalous The new sensor anomalous status
795 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
798 struct AnomalyInfo *ai;
799 struct CorePeer *corep;
800 struct CadetPeer *cadetp;
802 if (GNUNET_NO == module_running)
804 ai = get_anomaly_info_by_sensor (sensor);
805 GNUNET_assert (NULL != ai);
806 ai->anomalous = anomalous;
807 /* Report change to all neighbors */
809 while (NULL != corep)
811 send_anomaly_report (corep->mq, ai);
814 if (NULL != ai->sensor->collection_point &&
815 GNUNET_YES == ai->sensor->report_anomalies)
817 cadetp = get_cadet_peer (*ai->sensor->collection_point);
818 send_anomaly_report (cadetp->mq, ai);
823 /******************************************************************************/
824 /******************* Reporting values (periodic) *********************/
825 /******************************************************************************/
829 * Task scheduled to send values to collection point
831 * @param cls closure, a `struct ValueReportingContext *`
835 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
837 struct ValueInfo *vi = cls;
838 struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
839 struct CadetPeer *cadetp;
840 struct GNUNET_MQ_Envelope *ev;
843 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
845 if (0 == vi->last_value_size ||
846 GNUNET_YES == vi->last_value_reported)
848 LOG (GNUNET_ERROR_TYPE_WARNING,
849 "Did not receive a fresh value from `%s' to report.\n",
853 LOG (GNUNET_ERROR_TYPE_DEBUG,
854 "Now trying to report last seen value of `%s' to collection point.\n",
856 cadetp = get_cadet_peer (*sensor->collection_point);
857 ev = create_value_message (vi);
858 GNUNET_MQ_send (cadetp->mq, ev);
859 vi->last_value_reported = GNUNET_YES;
863 /******************************************************************************/
864 /******************************** INIT *******************************/
865 /******************************************************************************/
869 * Iterator for defined sensors and creates anomaly info context
873 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
874 * @return #GNUNET_YES to continue iterations
877 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
880 struct GNUNET_SENSOR_SensorInfo *sensor = value;
881 struct AnomalyInfo *ai;
882 struct ValueInfo *vi;
884 /* Create sensor anomaly info context */
885 ai = GNUNET_new (struct AnomalyInfo);
887 ai->anomalous = GNUNET_NO;
888 ai->anomalous_neighbors =
889 GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
890 GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
891 /* Create sensor value info context (if needed to be reported) */
892 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
894 LOG (GNUNET_ERROR_TYPE_INFO,
895 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
896 sensor->name, GNUNET_i2s_full (sensor->collection_point),
897 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
899 vi = GNUNET_new (struct ValueInfo);
901 vi->last_value = NULL;
902 vi->last_value_size = 0;
903 vi->last_value_reported = GNUNET_NO;
905 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
906 &value_watch_cb, vi);
908 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
910 GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
916 * Start the sensor anomaly reporting module
918 * @param c our service configuration
919 * @param s multihashmap of loaded sensors
920 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
923 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
924 struct GNUNET_CONTAINER_MultiHashMap *s)
926 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
927 {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
928 sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
931 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
935 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
936 GNUNET_assert (NULL != s);
939 /* Connect to PEERSTORE */
940 peerstore = GNUNET_PEERSTORE_connect (cfg);
941 if (NULL == peerstore)
943 LOG (GNUNET_ERROR_TYPE_ERROR,
944 _("Failed to connect to peerstore service.\n"));
945 SENSOR_reporting_stop ();
946 return GNUNET_SYSERR;
948 /* Connect to CORE */
950 GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
951 &core_disconnect_cb, NULL, GNUNET_YES, NULL,
952 GNUNET_YES, core_handlers);
955 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
956 SENSOR_reporting_stop ();
957 return GNUNET_SYSERR;
959 /* Connect to CADET */
961 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
962 cadet_handlers, NULL);
965 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
966 SENSOR_reporting_stop ();
967 return GNUNET_SYSERR;
969 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
970 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
972 module_running = GNUNET_YES;
976 /* end of gnunet-service-sensor_reporting.c */