2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor_reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
27 #include "gnunet_util_lib.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
38 * When we are still generating a proof-of-work and we need to send an anomaly
39 * report, we queue them until the generation is complete
41 struct AnomalyReportingQueueItem
47 struct AnomalyReportingQueueItem *prev;
52 struct AnomalyReportingQueueItem *next;
55 * Message queue belonging to the peer that is the destination of the report
57 struct GNUNET_MQ_Handle *dest_mq;
72 struct AnomalyInfo *prev;
77 struct AnomalyInfo *next;
82 struct GNUNET_SENSOR_SensorInfo *sensor;
85 * Current anomalous status of sensor
90 * List of peers that reported an anomaly for this sensor
92 struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
95 * Report block with proof-of-work and signature
97 struct GNUNET_SENSOR_crypto_pow_block *report_block;
100 * Context of an operation creating pow and signature
102 struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
105 * Head of the queue of pending report destinations
107 struct AnomalyReportingQueueItem *reporting_queue_head;
110 * Head of the queue of pending report destinations
112 struct AnomalyReportingQueueItem *reporting_queue_tail;
122 struct ValueInfo *prev;
127 struct ValueInfo *next;
132 struct GNUNET_SENSOR_SensorInfo *sensor;
135 * Last value read from sensor
140 * Size of @e last_value
142 size_t last_value_size;
145 * Timestamp of last value reading
147 struct GNUNET_TIME_Absolute last_value_timestamp;
150 * Has the last value seen already been reported to collection point?
152 int last_value_reported;
155 * Watcher of sensor values
157 struct GNUNET_PEERSTORE_WatchContext *wc;
160 * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
162 GNUNET_SCHEDULER_TaskIdentifier reporting_task;
167 * Information about a connected CORE peer.
168 * Note that we only know about a connected peer if it is running the same
169 * application (sensor anomaly reporting) as us.
177 struct CorePeer *prev;
182 struct CorePeer *next;
185 * Peer identity of connected peer
187 struct GNUNET_PeerIdentity *peer_id;
190 * Message queue for messages to be sent to this peer
192 struct GNUNET_MQ_Handle *mq;
197 * Information about a connected CADET peer (collection point).
205 struct CadetPeer *prev;
210 struct CadetPeer *next;
215 struct GNUNET_PeerIdentity peer_id;
218 * CADET channel handle
220 struct GNUNET_CADET_Channel *channel;
223 * Message queue for messages to be sent to this peer
225 struct GNUNET_MQ_Handle *mq;
228 * Are we currently destroying the channel and its context?
238 static const struct GNUNET_CONFIGURATION_Handle *cfg;
241 * Multihashmap of loaded sensors
243 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
246 * Handle to peerstore service
248 static struct GNUNET_PEERSTORE_Handle *peerstore;
251 * Handle to core service
253 static struct GNUNET_CORE_Handle *core;
256 * Handle to CADET service
258 static struct GNUNET_CADET_Handle *cadet;
263 static struct GNUNET_PeerIdentity mypeerid;
268 static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
271 * Head of DLL of anomaly info structs
273 static struct AnomalyInfo *ai_head;
276 * Tail of DLL of anomaly info structs
278 static struct AnomalyInfo *ai_tail;
281 * Head of DLL of value info structs
283 static struct ValueInfo *vi_head;
286 * Tail of DLL of value info structs
288 static struct ValueInfo *vi_tail;
291 * Head of DLL of CORE peers
293 static struct CorePeer *corep_head;
296 * Tail of DLL of CORE peers
298 static struct CorePeer *corep_tail;
301 * Head of DLL of CADET peers
303 static struct CadetPeer *cadetp_head;
306 * Tail of DLL of CADET peers
308 static struct CadetPeer *cadetp_tail;
311 * Is the module started?
313 static int module_running = GNUNET_NO;
316 * Number of known neighborhood peers
318 static int neighborhood;
321 * Parameter that defines the complexity of the proof-of-work
323 static long long unsigned int pow_matching_bits;
327 /******************************************************************************/
328 /****************************** CLEANUP ******************************/
329 /******************************************************************************/
332 * Destroy anomaly info struct
334 * @param ai struct to destroy
337 destroy_anomaly_info (struct AnomalyInfo *ai)
339 struct AnomalyReportingQueueItem *ar_item;
341 ar_item = ai->reporting_queue_head;
342 while (NULL != ar_item)
344 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
345 ai->reporting_queue_tail, ar_item);
346 GNUNET_free (ar_item);
347 ar_item = ai->reporting_queue_head;
349 if (NULL != ai->report_creation_cx)
351 GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
352 ai->report_creation_cx = NULL;
354 if (NULL != ai->report_block)
356 GNUNET_free (ai->report_block);
357 ai->report_block = NULL;
359 if (NULL != ai->anomalous_neighbors)
361 GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
362 ai->anomalous_neighbors = NULL;
369 * Destroy value info struct
371 * @param vi struct to destroy
374 destroy_value_info (struct ValueInfo *vi)
378 GNUNET_PEERSTORE_watch_cancel (vi->wc);
381 if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
383 GNUNET_SCHEDULER_cancel (vi->reporting_task);
384 vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
386 if (NULL != vi->last_value)
388 GNUNET_free (vi->last_value);
389 vi->last_value = NULL;
396 * Destroy core peer struct
398 * @param corep struct to destroy
401 destroy_core_peer (struct CorePeer *corep)
403 struct AnomalyInfo *ai;
405 if (NULL != corep->mq)
407 GNUNET_MQ_destroy (corep->mq);
413 GNUNET_assert (NULL != ai->anomalous_neighbors);
414 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
423 * Destroy cadet peer struct
425 * @param cadetp struct to destroy
428 destroy_cadet_peer (struct CadetPeer *cadetp)
430 cadetp->destroying = GNUNET_YES;
431 if (NULL != cadetp->mq)
433 GNUNET_MQ_destroy (cadetp->mq);
436 if (NULL != cadetp->channel)
438 GNUNET_CADET_channel_destroy (cadetp->channel);
439 cadetp->channel = NULL;
441 GNUNET_free (cadetp);
446 * Stop sensor reporting module
449 SENSOR_reporting_stop ()
451 struct ValueInfo *vi;
452 struct CorePeer *corep;
453 struct AnomalyInfo *ai;
454 struct CadetPeer *cadetp;
456 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
457 module_running = GNUNET_NO;
459 /* Destroy value info's */
463 GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
464 destroy_value_info (vi);
467 /* Destroy core peers */
469 while (NULL != corep)
471 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
472 destroy_core_peer (corep);
475 /* Destroy anomaly info's */
479 GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
480 destroy_anomaly_info (ai);
483 /* Destroy cadet peers */
484 cadetp = cadetp_head;
485 while (NULL != cadetp)
487 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
488 destroy_cadet_peer (cadetp);
489 cadetp = cadetp_head;
491 /* Disconnect from other services */
494 GNUNET_CORE_disconnect (core);
497 if (NULL != peerstore)
499 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
504 GNUNET_CADET_disconnect (cadet);
510 /******************************************************************************/
511 /****************************** HELPERS ******************************/
512 /******************************************************************************/
516 * Gets the anomaly info struct related to the given sensor
518 * @param sensor Sensor to search by
520 static struct AnomalyInfo *
521 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
523 struct AnomalyInfo *ai;
528 if (ai->sensor == sensor)
539 * Returns context of a connected CADET peer.
540 * Creates it first if didn't exist before.
542 * @param pid Peer Identity
543 * @return Context of connected CADET peer
545 static struct CadetPeer *
546 get_cadet_peer (struct GNUNET_PeerIdentity pid)
548 struct CadetPeer *cadetp;
550 cadetp = cadetp_head;
551 while (NULL != cadetp)
553 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
555 cadetp = cadetp->next;
557 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
559 /* Not found, create struct and channel */
560 cadetp = GNUNET_new (struct CadetPeer);
561 cadetp->peer_id = pid;
563 GNUNET_CADET_channel_create (cadet, cadetp, &pid,
564 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
565 GNUNET_CADET_OPTION_RELIABLE);
566 cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
567 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
573 * This function is called only when we have a block ready and want to send it
574 * to the given peer (represented by its message queue)
576 * @param mq Message queue to put the message in
577 * @param ai Anomaly info to report
578 * @param type Message type
581 do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
584 struct GNUNET_MessageHeader *msg;
585 struct GNUNET_MQ_Envelope *ev;
588 GNUNET_assert (NULL != ai->report_block);
590 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
591 ai->report_block->msg_size;
592 ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
593 memcpy (&msg[1], ai->report_block, block_size);
594 GNUNET_MQ_send (mq, ev);
599 * Check if we have signed and proof-of-work block ready.
600 * If yes, we send the report directly, if no, we enqueue the reporting until
601 * the block is ready.
603 * @param mq Message queue to put the message in
604 * @param ai Anomaly info to report
605 * @param p2p Is the report sent to a neighboring peer
608 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
611 struct AnomalyReportingQueueItem *ar_item;
616 p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
617 GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
618 if (NULL == ai->report_block)
620 ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
622 ar_item->dest_mq = mq;
623 ar_item->type = type;
624 GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
625 ai->reporting_queue_tail, ar_item);
629 do_send_anomaly_report (mq, ai, type);
635 * Callback when the crypto module finished created proof-of-work and signature
636 * for an anomaly report.
638 * @param cls Closure, a `struct AnomalyInfo *`
639 * @param block The resulting block, NULL on error
642 report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
644 struct AnomalyInfo *ai = cls;
645 struct AnomalyReportingQueueItem *ar_item;
647 ai->report_creation_cx = NULL;
648 if (NULL != ai->report_block)
650 LOG (GNUNET_ERROR_TYPE_ERROR,
651 _("Double creation of proof-of-work, this should not happen.\n"));
656 LOG (GNUNET_ERROR_TYPE_ERROR,
657 _("Failed to create pow and signature block.\n"));
660 LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
662 GNUNET_memdup (block,
663 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
665 ar_item = ai->reporting_queue_head;
666 while (NULL != ar_item)
668 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
669 ai->reporting_queue_tail, ar_item);
670 do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
671 GNUNET_free (ar_item);
672 ar_item = ai->reporting_queue_head;
678 * When a change to the anomaly info of a sensor is done, this function should
679 * be called to create the message, its proof-of-work and signuature ready to
680 * be sent to other peers or collection point.
682 * @param ai Anomaly Info struct
685 update_anomaly_report_pow_block (struct AnomalyInfo *ai)
687 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
688 struct GNUNET_TIME_Absolute timestamp;
690 LOG (GNUNET_ERROR_TYPE_DEBUG,
691 "Updating anomaly report POW block due to data change.\n");
692 if (NULL != ai->report_block)
694 GNUNET_free (ai->report_block);
695 ai->report_block = NULL;
697 if (NULL != ai->report_creation_cx)
699 /* If a creation is already running, cancel it because the data changed */
700 GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
701 ai->report_creation_cx = NULL;
703 arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
705 GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
706 &arm->sensorname_hash);
707 arm->sensorversion_major = htons (ai->sensor->version_major);
708 arm->sensorversion_minor = htons (ai->sensor->version_minor);
709 arm->anomalous = htons (ai->anomalous);
710 arm->anomalous_neighbors =
712 neighborhood) ? 0 : ((float) GNUNET_CONTAINER_multipeermap_size (ai->
713 anomalous_neighbors))
715 timestamp = GNUNET_TIME_absolute_get ();
716 ai->report_creation_cx =
717 GNUNET_SENSOR_crypto_pow_sign (arm,
719 GNUNET_SENSOR_AnomalyReportMessage),
720 ×tamp, &mypeerid.public_key,
721 private_key, pow_matching_bits,
722 &report_creation_cb, ai);
728 * Create a sensor value message from a given value info struct inside a MQ
731 * @param vi Value info struct to use
732 * @return Envelope with message
734 static struct GNUNET_MQ_Envelope *
735 create_value_message (struct ValueInfo *vi)
737 struct GNUNET_SENSOR_ValueMessage *vm;
738 struct GNUNET_MQ_Envelope *ev;
740 ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
741 GNUNET_MESSAGE_TYPE_SENSOR_READING);
742 GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
743 &vm->sensorname_hash);
744 vm->sensorversion_major = htons (vi->sensor->version_major);
745 vm->sensorversion_minor = htons (vi->sensor->version_minor);
746 vm->timestamp = vi->last_value_timestamp;
747 vm->value_size = htons (vi->last_value_size);
748 memcpy (&vm[1], vi->last_value, vi->last_value_size);
753 /******************************************************************************/
754 /*************************** CORE Handlers ***************************/
755 /******************************************************************************/
759 * An inbound anomaly report is received from a peer through CORE.
761 * @param cls closure (unused)
762 * @param peer the other peer involved
763 * @param message the actual message
764 * @return #GNUNET_OK to keep the connection open,
765 * #GNUNET_SYSERR to close connection to the peer (signal serious error)
768 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
769 const struct GNUNET_MessageHeader *message)
771 struct GNUNET_SENSOR_crypto_pow_block *report_block;
772 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
773 struct GNUNET_SENSOR_SensorInfo *sensor;
774 struct AnomalyInfo *my_anomaly_info;
775 struct CadetPeer *cadetp;
777 int peer_in_anomalous_list;
779 /* Verify proof-of-work, signature and extract report message */
780 report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
781 if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
782 GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
783 (struct GNUNET_CRYPTO_EddsaPublicKey
784 *) &other->public_key,
787 LOG (GNUNET_ERROR_TYPE_WARNING,
788 "Received invalid anomaly report from peer `%s'.\n",
791 return GNUNET_SYSERR;
793 /* Now we parse the content of the message */
794 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
795 if (NULL == sensor ||
796 sensor->version_major != ntohs (arm->sensorversion_major) ||
797 sensor->version_minor != ntohs (arm->sensorversion_minor))
799 LOG (GNUNET_ERROR_TYPE_WARNING,
800 "I don't have the sensor reported by the peer `%s'.\n",
804 my_anomaly_info = get_anomaly_info_by_sensor (sensor);
805 GNUNET_assert (NULL != my_anomaly_info);
806 peer_in_anomalous_list =
807 GNUNET_CONTAINER_multipeermap_contains (my_anomaly_info->
808 anomalous_neighbors, other);
809 peer_anomalous = ntohs (arm->anomalous);
810 LOG (GNUNET_ERROR_TYPE_DEBUG,
811 "Received an anomaly update from neighbour `%s' (%d).\n",
812 GNUNET_i2s (other), peer_anomalous);
813 if (GNUNET_YES == peer_anomalous)
815 if (GNUNET_YES == peer_in_anomalous_list) /* repeated positive report */
818 GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
820 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
824 if (GNUNET_NO == peer_in_anomalous_list) /* repeated negative report */
827 GNUNET_CONTAINER_multipeermap_remove_all (my_anomaly_info->
828 anomalous_neighbors, other);
830 /* This is important to create an updated block since the data changed */
831 update_anomaly_report_pow_block (my_anomaly_info);
832 /* Send anomaly update to collection point only if I have the same anomaly */
833 if (GNUNET_YES == my_anomaly_info->anomalous &&
834 NULL != sensor->collection_point &&
835 GNUNET_YES == sensor->report_anomalies)
837 LOG (GNUNET_ERROR_TYPE_DEBUG,
838 "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
839 GNUNET_i2s (sensor->collection_point));
840 cadetp = get_cadet_peer (*sensor->collection_point);
841 send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
847 /******************************************************************************/
848 /************************ PEERSTORE callbacks ************************/
849 /******************************************************************************/
853 * Sensor value watch callback
855 * @param cls Closure, ValueInfo struct related to the sensor we are watching
856 * @param record PEERSTORE new record, NULL if error
857 * @param emsg Error message, NULL if no error
858 * @return GNUNET_YES to continue watching
861 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
863 struct ValueInfo *vi = cls;
867 LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
870 if (NULL != vi->last_value)
872 GNUNET_free (vi->last_value);
873 vi->last_value_size = 0;
875 vi->last_value = GNUNET_memdup (record->value, record->value_size);
876 vi->last_value_size = record->value_size;
877 vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
878 vi->last_value_reported = GNUNET_NO;
883 /******************************************************************************/
884 /************************** CORE callbacks ***************************/
885 /******************************************************************************/
889 * Method called whenever a CORE peer disconnects.
891 * @param cls closure (unused)
892 * @param peer peer identity this notification is about
895 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
897 struct CorePeer *corep;
899 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
901 LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
905 while (NULL != corep)
907 if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
909 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
910 destroy_core_peer (corep);
919 * Method called whenever a given peer connects through CORE.
921 * @param cls closure (unused)
922 * @param peer peer identity this notification is about
925 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
927 struct CorePeer *corep;
928 struct AnomalyInfo *ai;
930 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
932 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
935 corep = GNUNET_new (struct CorePeer);
936 corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
937 corep->mq = GNUNET_CORE_mq_create (core, peer);
938 GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
939 /* Send any locally anomalous sensors to the new peer */
943 if (GNUNET_YES == ai->anomalous)
945 LOG (GNUNET_ERROR_TYPE_DEBUG,
946 "Updating newly connected neighbor `%s' with anomalous sensor.\n",
948 send_anomaly_report (corep->mq, ai, GNUNET_YES);
956 * Function called after #GNUNET_CORE_connect has succeeded (or failed
957 * for good). Note that the private key of the peer is intentionally
958 * not exposed here; if you need it, your process should try to read
959 * the private key file directly (which should work if you are
960 * authorized...). Implementations of this function must not call
961 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
964 * @param cls closure (unused)
965 * @param my_identity ID of this peer, NULL if we failed
968 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
970 if (NULL == my_identity)
972 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
973 SENSOR_reporting_stop ();
976 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
978 LOG (GNUNET_ERROR_TYPE_ERROR,
979 _("Peer identity received from CORE init doesn't match ours.\n"));
980 SENSOR_reporting_stop ();
986 /******************************************************************************/
987 /************************* CADET callbacks ***************************/
988 /******************************************************************************/
991 * Function called whenever a channel is destroyed. Should clean up
992 * any associated state.
994 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
996 * @param cls closure (set from #GNUNET_CADET_connect)
997 * @param channel connection to the other end (henceforth invalid)
998 * @param channel_ctx place where local state associated
999 * with the channel is stored
1002 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
1005 struct CadetPeer *cadetp = channel_ctx;
1007 if (GNUNET_YES == cadetp->destroying)
1009 LOG (GNUNET_ERROR_TYPE_DEBUG,
1010 "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1011 GNUNET_i2s (&cadetp->peer_id));
1012 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
1013 cadetp->channel = NULL;
1014 destroy_cadet_peer (cadetp);
1018 /******************************************************************************/
1019 /********************** Local anomaly receiver ***********************/
1020 /******************************************************************************/
1024 * Used by the analysis module to tell the reporting module about a change in
1025 * the anomaly status of a sensor.
1027 * @param sensor Related sensor
1028 * @param anomalous The new sensor anomalous status
1031 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
1034 struct AnomalyInfo *ai;
1035 struct CorePeer *corep;
1036 struct CadetPeer *cadetp;
1038 if (GNUNET_NO == module_running)
1040 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
1041 ai = get_anomaly_info_by_sensor (sensor);
1042 GNUNET_assert (NULL != ai);
1043 ai->anomalous = anomalous;
1044 /* This is important to create an updated block since the data changed */
1045 update_anomaly_report_pow_block (ai);
1046 /* Report change to all neighbors */
1048 while (NULL != corep)
1050 LOG (GNUNET_ERROR_TYPE_DEBUG,
1051 "Sending an anomaly report to neighbor `%s'.\n",
1052 GNUNET_i2s (corep->peer_id));
1053 send_anomaly_report (corep->mq, ai, GNUNET_YES);
1054 corep = corep->next;
1056 /* Report change to collection point if need */
1057 if (NULL != ai->sensor->collection_point &&
1058 GNUNET_YES == ai->sensor->report_anomalies)
1060 LOG (GNUNET_ERROR_TYPE_DEBUG,
1061 "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
1062 GNUNET_i2s (ai->sensor->collection_point));
1063 cadetp = get_cadet_peer (*ai->sensor->collection_point);
1064 send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
1069 /******************************************************************************/
1070 /******************* Reporting values (periodic) *********************/
1071 /******************************************************************************/
1075 * Task scheduled to send values to collection point
1077 * @param cls closure, a `struct ValueReportingContext *`
1081 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1083 struct ValueInfo *vi = cls;
1084 struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
1085 struct CadetPeer *cadetp;
1086 struct GNUNET_MQ_Envelope *ev;
1088 vi->reporting_task =
1089 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1091 if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
1093 LOG (GNUNET_ERROR_TYPE_WARNING,
1094 "Did not receive a fresh value from `%s' to report.\n", sensor->name);
1097 LOG (GNUNET_ERROR_TYPE_DEBUG,
1098 "Now trying to report last seen value of `%s' to collection point.\n",
1100 cadetp = get_cadet_peer (*sensor->collection_point);
1101 ev = create_value_message (vi);
1102 GNUNET_MQ_send (cadetp->mq, ev);
1103 vi->last_value_reported = GNUNET_YES;
1107 /******************************************************************************/
1108 /******************************** INIT *******************************/
1109 /******************************************************************************/
1113 * Iterator for defined sensors and creates anomaly info context
1117 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
1118 * @return #GNUNET_YES to continue iterations
1121 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
1124 struct GNUNET_SENSOR_SensorInfo *sensor = value;
1125 struct AnomalyInfo *ai;
1126 struct ValueInfo *vi;
1128 /* Create sensor anomaly info context */
1129 ai = GNUNET_new (struct AnomalyInfo);
1131 ai->sensor = sensor;
1132 ai->anomalous = GNUNET_NO;
1133 ai->anomalous_neighbors =
1134 GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1135 ai->report_block = NULL;
1136 ai->report_creation_cx = NULL;
1137 GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
1138 /* Create sensor value info context (if needed to be reported) */
1139 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
1141 LOG (GNUNET_ERROR_TYPE_INFO,
1142 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
1143 sensor->name, GNUNET_i2s_full (sensor->collection_point),
1144 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
1146 vi = GNUNET_new (struct ValueInfo);
1147 vi->sensor = sensor;
1148 vi->last_value = NULL;
1149 vi->last_value_size = 0;
1150 vi->last_value_reported = GNUNET_NO;
1152 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
1153 &value_watch_cb, vi);
1154 vi->reporting_task =
1155 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1157 GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
1163 * Start the sensor anomaly reporting module
1165 * @param c our service configuration
1166 * @param s multihashmap of loaded sensors
1167 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
1170 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
1171 struct GNUNET_CONTAINER_MultiHashMap *s)
1173 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1174 {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
1175 sizeof (struct GNUNET_MessageHeader) +
1176 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
1177 sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
1180 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1184 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
1185 GNUNET_assert (NULL != s);
1189 GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
1190 "POW_MATCHING_BITS",
1191 &pow_matching_bits))
1193 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
1194 "POW_MATCHING_BITS");
1195 SENSOR_reporting_stop ();
1196 return GNUNET_SYSERR;
1198 if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
1200 LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
1201 pow_matching_bits, sizeof (struct GNUNET_HashCode));
1202 SENSOR_reporting_stop ();
1203 return GNUNET_SYSERR;
1205 /* Connect to PEERSTORE */
1206 peerstore = GNUNET_PEERSTORE_connect (cfg);
1207 if (NULL == peerstore)
1209 LOG (GNUNET_ERROR_TYPE_ERROR,
1210 _("Failed to connect to peerstore service.\n"));
1211 SENSOR_reporting_stop ();
1212 return GNUNET_SYSERR;
1214 /* Connect to CORE */
1216 GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
1217 &core_disconnect_cb, NULL, GNUNET_YES, NULL,
1218 GNUNET_YES, core_handlers);
1221 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1222 SENSOR_reporting_stop ();
1223 return GNUNET_SYSERR;
1225 /* Connect to CADET */
1227 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1228 cadet_handlers, NULL);
1231 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1232 SENSOR_reporting_stop ();
1233 return GNUNET_SYSERR;
1235 private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1236 if (NULL == private_key)
1238 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
1239 SENSOR_reporting_stop ();
1240 return GNUNET_SYSERR;
1242 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1243 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1245 module_running = GNUNET_YES;
1249 /* end of gnunet-service-sensor_reporting.c */