2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor_reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
27 #include "gnunet_util_lib.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
38 * When we are still generating a proof-of-work and we need to send an anomaly
39 * report, we queue them until the generation is complete
41 struct AnomalyReportingQueueItem
47 struct AnomalyReportingQueueItem *prev;
52 struct AnomalyReportingQueueItem *next;
55 * Message queue belonging to the peer that is the destination of the report
57 struct GNUNET_MQ_Handle *dest_mq;
72 struct AnomalyInfo *prev;
77 struct AnomalyInfo *next;
82 struct GNUNET_SENSOR_SensorInfo *sensor;
85 * Current anomalous status of sensor
90 * List of peers that reported an anomaly for this sensor
92 struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
95 * Report block with proof-of-work and signature
97 struct GNUNET_SENSOR_crypto_pow_block *report_block;
100 * Context of an operation creating pow and signature
102 struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
105 * Head of the queue of pending report destinations
107 struct AnomalyReportingQueueItem *reporting_queue_head;
110 * Head of the queue of pending report destinations
112 struct AnomalyReportingQueueItem *reporting_queue_tail;
122 struct ValueInfo *prev;
127 struct ValueInfo *next;
132 struct GNUNET_SENSOR_SensorInfo *sensor;
135 * Last value read from sensor
140 * Size of @e last_value
142 size_t last_value_size;
145 * Timestamp of last value reading
147 struct GNUNET_TIME_Absolute last_value_timestamp;
150 * Has the last value seen already been reported to collection point?
152 int last_value_reported;
155 * Watcher of sensor values
157 struct GNUNET_PEERSTORE_WatchContext *wc;
160 * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
162 GNUNET_SCHEDULER_TaskIdentifier reporting_task;
167 * Information about a connected CORE peer.
168 * Note that we only know about a connected peer if it is running the same
169 * application (sensor anomaly reporting) as us.
177 struct CorePeer *prev;
182 struct CorePeer *next;
185 * Peer identity of connected peer
187 struct GNUNET_PeerIdentity *peer_id;
190 * Message queue for messages to be sent to this peer
192 struct GNUNET_MQ_Handle *mq;
197 * Information about a connected CADET peer (collection point).
205 struct CadetPeer *prev;
210 struct CadetPeer *next;
215 struct GNUNET_PeerIdentity peer_id;
218 * CADET channel handle
220 struct GNUNET_CADET_Channel *channel;
223 * Message queue for messages to be sent to this peer
225 struct GNUNET_MQ_Handle *mq;
228 * Are we currently destroying the channel and its context?
238 static const struct GNUNET_CONFIGURATION_Handle *cfg;
241 * Multihashmap of loaded sensors
243 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
246 * Handle to peerstore service
248 static struct GNUNET_PEERSTORE_Handle *peerstore;
251 * Handle to core service
253 static struct GNUNET_CORE_Handle *core;
256 * Handle to CADET service
258 static struct GNUNET_CADET_Handle *cadet;
263 static struct GNUNET_PeerIdentity mypeerid;
268 static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
271 * Head of DLL of anomaly info structs
273 static struct AnomalyInfo *ai_head;
276 * Tail of DLL of anomaly info structs
278 static struct AnomalyInfo *ai_tail;
281 * Head of DLL of value info structs
283 static struct ValueInfo *vi_head;
286 * Tail of DLL of value info structs
288 static struct ValueInfo *vi_tail;
291 * Head of DLL of CORE peers
293 static struct CorePeer *corep_head;
296 * Tail of DLL of CORE peers
298 static struct CorePeer *corep_tail;
301 * Head of DLL of CADET peers
303 static struct CadetPeer *cadetp_head;
306 * Tail of DLL of CADET peers
308 static struct CadetPeer *cadetp_tail;
311 * Is the module started?
313 static int module_running = GNUNET_NO;
316 * Number of known neighborhood peers
318 static int neighborhood;
321 * Parameter that defines the complexity of the proof-of-work
323 static long long unsigned int pow_matching_bits;
327 /******************************************************************************/
328 /****************************** CLEANUP ******************************/
329 /******************************************************************************/
332 * Destroy anomaly info struct
334 * @param ai struct to destroy
337 destroy_anomaly_info (struct AnomalyInfo *ai)
339 struct AnomalyReportingQueueItem *ar_item;
341 ar_item = ai->reporting_queue_head;
342 while (NULL != ar_item)
344 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
345 ai->reporting_queue_tail, ar_item);
346 GNUNET_free (ar_item);
347 ar_item = ai->reporting_queue_head;
349 if (NULL != ai->report_creation_cx)
351 GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
352 ai->report_creation_cx = NULL;
354 if (NULL != ai->report_block)
356 GNUNET_free (ai->report_block);
357 ai->report_block = NULL;
359 if (NULL != ai->anomalous_neighbors)
361 GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
362 ai->anomalous_neighbors = NULL;
369 * Destroy value info struct
371 * @param vi struct to destroy
374 destroy_value_info (struct ValueInfo *vi)
378 GNUNET_PEERSTORE_watch_cancel (vi->wc);
381 if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
383 GNUNET_SCHEDULER_cancel (vi->reporting_task);
384 vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
386 if (NULL != vi->last_value)
388 GNUNET_free (vi->last_value);
389 vi->last_value = NULL;
396 * Destroy core peer struct
398 * @param corep struct to destroy
401 destroy_core_peer (struct CorePeer *corep)
403 struct AnomalyInfo *ai;
405 if (NULL != corep->mq)
407 GNUNET_MQ_destroy (corep->mq);
413 GNUNET_assert (NULL != ai->anomalous_neighbors);
414 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
423 * Destroy cadet peer struct
425 * @param cadetp struct to destroy
428 destroy_cadet_peer (struct CadetPeer *cadetp)
430 cadetp->destroying = GNUNET_YES;
431 if (NULL != cadetp->mq)
433 GNUNET_MQ_destroy (cadetp->mq);
436 if (NULL != cadetp->channel)
438 GNUNET_CADET_channel_destroy (cadetp->channel);
439 cadetp->channel = NULL;
441 GNUNET_free (cadetp);
446 * Stop sensor reporting module
449 SENSOR_reporting_stop ()
451 struct ValueInfo *vi;
452 struct CorePeer *corep;
453 struct AnomalyInfo *ai;
454 struct CadetPeer *cadetp;
456 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
457 module_running = GNUNET_NO;
459 /* Destroy value info's */
463 GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
464 destroy_value_info (vi);
467 /* Destroy core peers */
469 while (NULL != corep)
471 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
472 destroy_core_peer (corep);
475 /* Destroy anomaly info's */
479 GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
480 destroy_anomaly_info (ai);
483 /* Destroy cadet peers */
484 cadetp = cadetp_head;
485 while (NULL != cadetp)
487 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
488 destroy_cadet_peer (cadetp);
489 cadetp = cadetp_head;
491 /* Disconnect from other services */
494 GNUNET_CORE_disconnect (core);
497 if (NULL != peerstore)
499 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
504 GNUNET_CADET_disconnect (cadet);
510 /******************************************************************************/
511 /****************************** HELPERS ******************************/
512 /******************************************************************************/
516 * Gets the anomaly info struct related to the given sensor
518 * @param sensor Sensor to search by
520 static struct AnomalyInfo *
521 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
523 struct AnomalyInfo *ai;
528 if (ai->sensor == sensor)
539 * Returns context of a connected CADET peer.
540 * Creates it first if didn't exist before.
542 * @param pid Peer Identity
543 * @return Context of connected CADET peer
545 static struct CadetPeer *
546 get_cadet_peer (struct GNUNET_PeerIdentity pid)
548 struct CadetPeer *cadetp;
550 cadetp = cadetp_head;
551 while (NULL != cadetp)
553 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
555 cadetp = cadetp->next;
557 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
559 /* Not found, create struct and channel */
560 cadetp = GNUNET_new (struct CadetPeer);
561 cadetp->peer_id = pid;
563 GNUNET_CADET_channel_create (cadet, cadetp, &pid,
564 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
565 GNUNET_CADET_OPTION_RELIABLE);
566 cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
567 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
573 * This function is called only when we have a block ready and want to send it
574 * to the given peer (represented by its message queue)
576 * @param mq Message queue to put the message in
577 * @param ai Anomaly info to report
578 * @param type Message type
581 do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
584 struct GNUNET_MessageHeader *msg;
585 struct GNUNET_MQ_Envelope *ev;
588 GNUNET_assert (NULL != ai->report_block);
590 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
591 ai->report_block->msg_size;
592 ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
593 memcpy (&msg[1], ai->report_block, block_size);
594 GNUNET_MQ_send (mq, ev);
599 * Check if we have signed and proof-of-work block ready.
600 * If yes, we send the report directly, if no, we enqueue the reporting until
601 * the block is ready.
603 * @param mq Message queue to put the message in
604 * @param ai Anomaly info to report
605 * @param p2p Is the report sent to a neighboring peer
608 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
611 struct AnomalyReportingQueueItem *ar_item;
616 p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
617 GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
618 if (NULL == ai->report_block)
620 ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
622 ar_item->dest_mq = mq;
623 ar_item->type = type;
624 GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
625 ai->reporting_queue_tail, ar_item);
629 do_send_anomaly_report (mq, ai, type);
635 * Callback when the crypto module finished created proof-of-work and signature
636 * for an anomaly report.
638 * @param cls Closure, a `struct AnomalyInfo *`
639 * @param block The resulting block, NULL on error
642 report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
644 struct AnomalyInfo *ai = cls;
645 struct AnomalyReportingQueueItem *ar_item;
647 ai->report_creation_cx = NULL;
648 if (NULL != ai->report_block)
650 LOG (GNUNET_ERROR_TYPE_ERROR,
651 _("Double creation of proof-of-work, this should not happen.\n"));
656 LOG (GNUNET_ERROR_TYPE_ERROR,
657 _("Failed to create pow and signature block.\n"));
660 LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
661 ai->report_block = block;
662 ar_item = ai->reporting_queue_head;
663 while (NULL != ar_item)
665 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
666 ai->reporting_queue_tail, ar_item);
667 do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
668 GNUNET_free (ar_item);
669 ar_item = ai->reporting_queue_head;
675 * When a change to the anomaly info of a sensor is done, this function should
676 * be called to create the message, its proof-of-work and signuature ready to
677 * be sent to other peers or collection point.
679 * @param ai Anomaly Info struct
682 update_anomaly_report_pow_block (struct AnomalyInfo *ai)
684 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
685 struct GNUNET_TIME_Absolute timestamp;
687 LOG (GNUNET_ERROR_TYPE_DEBUG,
688 "Updating anomaly report POW block due to data change.\n");
689 if (NULL != ai->report_block)
691 GNUNET_free (ai->report_block);
692 ai->report_block = NULL;
694 if (NULL != ai->report_creation_cx)
696 /* If a creation is already running, cancel it because the data changed */
697 GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
698 ai->report_creation_cx = NULL;
700 arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
702 GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
703 &arm->sensorname_hash);
704 arm->sensorversion_major = htons (ai->sensor->version_major);
705 arm->sensorversion_minor = htons (ai->sensor->version_minor);
706 arm->anomalous = htons (ai->anomalous);
707 arm->anomalous_neighbors =
709 neighborhood) ? 0 : ((float)
710 GNUNET_CONTAINER_multipeermap_size
711 (ai->anomalous_neighbors)) / neighborhood;
712 timestamp = GNUNET_TIME_absolute_get ();
713 ai->report_creation_cx =
714 GNUNET_SENSOR_crypto_pow_sign (arm,
716 GNUNET_SENSOR_AnomalyReportMessage),
717 ×tamp, &mypeerid.public_key,
718 private_key, pow_matching_bits,
719 &report_creation_cb, ai);
724 * Create a sensor value message from a given value info struct inside a MQ
727 * @param vi Value info struct to use
728 * @return Envelope with message
730 static struct GNUNET_MQ_Envelope *
731 create_value_message (struct ValueInfo *vi)
733 struct GNUNET_SENSOR_ValueMessage *vm;
734 struct GNUNET_MQ_Envelope *ev;
736 ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
737 GNUNET_MESSAGE_TYPE_SENSOR_READING);
738 GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
739 &vm->sensorname_hash);
740 vm->sensorversion_major = htons (vi->sensor->version_major);
741 vm->sensorversion_minor = htons (vi->sensor->version_minor);
742 vm->timestamp = vi->last_value_timestamp;
743 vm->value_size = htons (vi->last_value_size);
744 memcpy (&vm[1], vi->last_value, vi->last_value_size);
749 /******************************************************************************/
750 /*************************** CORE Handlers ***************************/
751 /******************************************************************************/
755 * An inbound anomaly report is received from a peer through CORE.
757 * @param cls closure (unused)
758 * @param peer the other peer involved
759 * @param message the actual message
760 * @return #GNUNET_OK to keep the connection open,
761 * #GNUNET_SYSERR to close connection to the peer (signal serious error)
764 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
765 const struct GNUNET_MessageHeader *message)
767 struct GNUNET_SENSOR_crypto_pow_block *report_block;
768 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
769 struct GNUNET_SENSOR_SensorInfo *sensor;
770 struct AnomalyInfo *my_anomaly_info;
771 struct CadetPeer *cadetp;
773 int peer_in_anomalous_list;
775 /* Verify proof-of-work, signature and extract report message */
776 report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
777 if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
778 GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
779 (struct GNUNET_CRYPTO_EddsaPublicKey
780 *) &other->public_key,
783 LOG (GNUNET_ERROR_TYPE_WARNING,
784 "Received invalid anomaly report from peer `%s'.\n",
787 return GNUNET_SYSERR;
789 /* Now we parse the content of the message */
790 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
791 if (NULL == sensor ||
792 sensor->version_major != ntohs (arm->sensorversion_major) ||
793 sensor->version_minor != ntohs (arm->sensorversion_minor))
795 LOG (GNUNET_ERROR_TYPE_WARNING,
796 "I don't have the sensor reported by the peer `%s'.\n",
800 my_anomaly_info = get_anomaly_info_by_sensor (sensor);
801 GNUNET_assert (NULL != my_anomaly_info);
802 peer_in_anomalous_list =
803 GNUNET_CONTAINER_multipeermap_contains
804 (my_anomaly_info->anomalous_neighbors, other);
805 peer_anomalous = ntohs (arm->anomalous);
806 LOG (GNUNET_ERROR_TYPE_DEBUG,
807 "Received an anomaly update from neighbour `%s' (%d).\n",
808 GNUNET_i2s (other), peer_anomalous);
809 if (GNUNET_YES == peer_anomalous)
811 if (GNUNET_YES == peer_in_anomalous_list) /* repeated positive report */
814 GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
816 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
820 if (GNUNET_NO == peer_in_anomalous_list) /* repeated negative report */
823 GNUNET_CONTAINER_multipeermap_remove_all
824 (my_anomaly_info->anomalous_neighbors, other);
826 /* This is important to create an updated block since the data changed */
827 update_anomaly_report_pow_block (my_anomaly_info);
828 /* Send anomaly update to collection point only if I have the same anomaly */
829 if (GNUNET_YES == my_anomaly_info->anomalous &&
830 NULL != sensor->collection_point &&
831 GNUNET_YES == sensor->report_anomalies)
833 LOG (GNUNET_ERROR_TYPE_DEBUG,
834 "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
835 GNUNET_i2s (sensor->collection_point));
836 cadetp = get_cadet_peer (*sensor->collection_point);
837 send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
843 /******************************************************************************/
844 /************************ PEERSTORE callbacks ************************/
845 /******************************************************************************/
849 * Sensor value watch callback
851 * @param cls Closure, ValueInfo struct related to the sensor we are watching
852 * @param record PEERSTORE new record, NULL if error
853 * @param emsg Error message, NULL if no error
854 * @return GNUNET_YES to continue watching
857 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
859 struct ValueInfo *vi = cls;
863 LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
866 if (NULL != vi->last_value)
868 GNUNET_free (vi->last_value);
869 vi->last_value_size = 0;
871 vi->last_value = GNUNET_memdup (record->value, record->value_size);
872 vi->last_value_size = record->value_size;
873 vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
874 vi->last_value_reported = GNUNET_NO;
879 /******************************************************************************/
880 /************************** CORE callbacks ***************************/
881 /******************************************************************************/
885 * Method called whenever a CORE peer disconnects.
887 * @param cls closure (unused)
888 * @param peer peer identity this notification is about
891 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
893 struct CorePeer *corep;
895 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
897 LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
901 while (NULL != corep)
903 if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
905 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
906 destroy_core_peer (corep);
915 * Method called whenever a given peer connects through CORE.
917 * @param cls closure (unused)
918 * @param peer peer identity this notification is about
921 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
923 struct CorePeer *corep;
924 struct AnomalyInfo *ai;
926 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
928 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
931 corep = GNUNET_new (struct CorePeer);
932 corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
933 corep->mq = GNUNET_CORE_mq_create (core, peer);
934 GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
935 /* Send any locally anomalous sensors to the new peer */
939 if (GNUNET_YES == ai->anomalous)
941 LOG (GNUNET_ERROR_TYPE_DEBUG,
942 "Updating newly connected neighbor `%s' with anomalous sensor.\n",
944 send_anomaly_report (corep->mq, ai, GNUNET_YES);
952 * Function called after #GNUNET_CORE_connect has succeeded (or failed
953 * for good). Note that the private key of the peer is intentionally
954 * not exposed here; if you need it, your process should try to read
955 * the private key file directly (which should work if you are
956 * authorized...). Implementations of this function must not call
957 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
960 * @param cls closure (unused)
961 * @param my_identity ID of this peer, NULL if we failed
964 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
966 if (NULL == my_identity)
968 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
969 SENSOR_reporting_stop ();
972 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
974 LOG (GNUNET_ERROR_TYPE_ERROR,
975 _("Peer identity received from CORE init doesn't match ours.\n"));
976 SENSOR_reporting_stop ();
982 /******************************************************************************/
983 /************************* CADET callbacks ***************************/
984 /******************************************************************************/
987 * Function called whenever a channel is destroyed. Should clean up
988 * any associated state.
990 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
992 * @param cls closure (set from #GNUNET_CADET_connect)
993 * @param channel connection to the other end (henceforth invalid)
994 * @param channel_ctx place where local state associated
995 * with the channel is stored
998 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
1001 struct CadetPeer *cadetp = channel_ctx;
1003 if (GNUNET_YES == cadetp->destroying)
1005 LOG (GNUNET_ERROR_TYPE_DEBUG,
1006 "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1007 GNUNET_i2s (&cadetp->peer_id));
1008 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
1009 cadetp->channel = NULL;
1010 destroy_cadet_peer (cadetp);
1014 /******************************************************************************/
1015 /********************** Local anomaly receiver ***********************/
1016 /******************************************************************************/
1020 * Used by the analysis module to tell the reporting module about a change in
1021 * the anomaly status of a sensor.
1023 * @param sensor Related sensor
1024 * @param anomalous The new sensor anomalous status
1027 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
1030 struct AnomalyInfo *ai;
1031 struct CorePeer *corep;
1032 struct CadetPeer *cadetp;
1034 if (GNUNET_NO == module_running)
1036 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
1037 ai = get_anomaly_info_by_sensor (sensor);
1038 GNUNET_assert (NULL != ai);
1039 ai->anomalous = anomalous;
1040 /* This is important to create an updated block since the data changed */
1041 update_anomaly_report_pow_block (ai);
1042 /* Report change to all neighbors */
1044 while (NULL != corep)
1046 LOG (GNUNET_ERROR_TYPE_DEBUG,
1047 "Sending an anomaly report to neighbor `%s'.\n",
1048 GNUNET_i2s (corep->peer_id));
1049 send_anomaly_report (corep->mq, ai, GNUNET_YES);
1050 corep = corep->next;
1052 /* Report change to collection point if need */
1053 if (NULL != ai->sensor->collection_point &&
1054 GNUNET_YES == ai->sensor->report_anomalies)
1056 LOG (GNUNET_ERROR_TYPE_DEBUG,
1057 "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
1058 GNUNET_i2s (ai->sensor->collection_point));
1059 cadetp = get_cadet_peer (*ai->sensor->collection_point);
1060 send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
1065 /******************************************************************************/
1066 /******************* Reporting values (periodic) *********************/
1067 /******************************************************************************/
1071 * Task scheduled to send values to collection point
1073 * @param cls closure, a `struct ValueReportingContext *`
1077 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1079 struct ValueInfo *vi = cls;
1080 struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
1081 struct CadetPeer *cadetp;
1082 struct GNUNET_MQ_Envelope *ev;
1084 vi->reporting_task =
1085 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1087 if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
1089 LOG (GNUNET_ERROR_TYPE_WARNING,
1090 "Did not receive a fresh value from `%s' to report.\n", sensor->name);
1093 LOG (GNUNET_ERROR_TYPE_DEBUG,
1094 "Now trying to report last seen value of `%s' to collection point.\n",
1096 cadetp = get_cadet_peer (*sensor->collection_point);
1097 ev = create_value_message (vi);
1098 GNUNET_MQ_send (cadetp->mq, ev);
1099 vi->last_value_reported = GNUNET_YES;
1103 /******************************************************************************/
1104 /******************************** INIT *******************************/
1105 /******************************************************************************/
1109 * Iterator for defined sensors and creates anomaly info context
1113 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
1114 * @return #GNUNET_YES to continue iterations
1117 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
1120 struct GNUNET_SENSOR_SensorInfo *sensor = value;
1121 struct AnomalyInfo *ai;
1122 struct ValueInfo *vi;
1124 /* Create sensor anomaly info context */
1125 ai = GNUNET_new (struct AnomalyInfo);
1127 ai->sensor = sensor;
1128 ai->anomalous = GNUNET_NO;
1129 ai->anomalous_neighbors =
1130 GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1131 ai->report_block = NULL;
1132 ai->report_creation_cx = NULL;
1133 GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
1134 /* Create sensor value info context (if needed to be reported) */
1135 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
1137 LOG (GNUNET_ERROR_TYPE_INFO,
1138 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
1139 sensor->name, GNUNET_i2s_full (sensor->collection_point),
1140 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
1142 vi = GNUNET_new (struct ValueInfo);
1143 vi->sensor = sensor;
1144 vi->last_value = NULL;
1145 vi->last_value_size = 0;
1146 vi->last_value_reported = GNUNET_NO;
1148 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
1149 &value_watch_cb, vi);
1150 vi->reporting_task =
1151 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1153 GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
1159 * Start the sensor anomaly reporting module
1161 * @param c our service configuration
1162 * @param s multihashmap of loaded sensors
1163 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
1166 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
1167 struct GNUNET_CONTAINER_MultiHashMap *s)
1169 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1170 {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
1171 sizeof (struct GNUNET_MessageHeader) +
1172 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
1173 sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
1176 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1180 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
1181 GNUNET_assert (NULL != s);
1185 GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
1186 "POW_MATCHING_BITS",
1187 &pow_matching_bits))
1189 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
1190 "POW_MATCHING_BITS");
1191 SENSOR_reporting_stop ();
1192 return GNUNET_SYSERR;
1194 if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
1196 LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
1197 pow_matching_bits, sizeof (struct GNUNET_HashCode));
1198 SENSOR_reporting_stop ();
1199 return GNUNET_SYSERR;
1201 /* Connect to PEERSTORE */
1202 peerstore = GNUNET_PEERSTORE_connect (cfg);
1203 if (NULL == peerstore)
1205 LOG (GNUNET_ERROR_TYPE_ERROR,
1206 _("Failed to connect to peerstore service.\n"));
1207 SENSOR_reporting_stop ();
1208 return GNUNET_SYSERR;
1210 /* Connect to CORE */
1212 GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
1213 &core_disconnect_cb, NULL, GNUNET_YES, NULL,
1214 GNUNET_YES, core_handlers);
1217 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1218 SENSOR_reporting_stop ();
1219 return GNUNET_SYSERR;
1221 /* Connect to CADET */
1223 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1224 cadet_handlers, NULL);
1227 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1228 SENSOR_reporting_stop ();
1229 return GNUNET_SYSERR;
1231 private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1232 if (NULL == private_key)
1234 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
1235 SENSOR_reporting_stop ();
1236 return GNUNET_SYSERR;
1238 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1239 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1241 module_running = GNUNET_YES;
1245 /* end of gnunet-service-sensor_reporting.c */