2 This file is part of GNUnet.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file sensor/gnunet-service-sensor_reporting.c
23 * @brief sensor service reporting functionality
24 * @author Omar Tarabai
27 #include "gnunet_util_lib.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
37 * Retry time when failing to connect to collection point
39 #define CP_RETRY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
43 * When we are still generating a proof-of-work and we need to send an anomaly
44 * report, we queue them until the generation is complete
46 struct AnomalyReportingQueueItem
52 struct AnomalyReportingQueueItem *prev;
57 struct AnomalyReportingQueueItem *next;
60 * Message queue belonging to the peer that is the destination of the report
62 struct GNUNET_MQ_Handle *dest_mq;
77 struct AnomalyInfo *prev;
82 struct AnomalyInfo *next;
87 struct GNUNET_SENSOR_SensorInfo *sensor;
90 * Current anomalous status of sensor
95 * List of peers that reported an anomaly for this sensor
97 struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
100 * Report block with proof-of-work and signature
102 struct GNUNET_SENSOR_crypto_pow_block *report_block;
105 * Context of an operation creating pow and signature
107 struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
110 * Head of the queue of pending report destinations
112 struct AnomalyReportingQueueItem *reporting_queue_head;
115 * Head of the queue of pending report destinations
117 struct AnomalyReportingQueueItem *reporting_queue_tail;
127 struct ValueInfo *prev;
132 struct ValueInfo *next;
137 struct GNUNET_SENSOR_SensorInfo *sensor;
140 * Last value read from sensor
145 * Size of @e last_value
147 size_t last_value_size;
150 * Timestamp of last value reading
152 struct GNUNET_TIME_Absolute last_value_timestamp;
155 * Has the last value seen already been reported to collection point?
157 int last_value_reported;
160 * Watcher of sensor values
162 struct GNUNET_PEERSTORE_WatchContext *wc;
165 * Collection point reporting task (or NULL)
167 struct GNUNET_SCHEDULER_Task *reporting_task;
172 * Information about a connected CORE peer.
173 * Note that we only know about a connected peer if it is running the same
174 * application (sensor anomaly reporting) as us.
182 struct CorePeer *prev;
187 struct CorePeer *next;
190 * Peer identity of connected peer
192 struct GNUNET_PeerIdentity *peer_id;
195 * Message queue for messages to be sent to this peer
197 struct GNUNET_MQ_Handle *mq;
202 * Information about a connected CADET peer (collection point).
210 struct CadetPeer *prev;
215 struct CadetPeer *next;
220 struct GNUNET_PeerIdentity peer_id;
223 * CADET channel handle
225 struct GNUNET_CADET_Channel *channel;
228 * Message queue for messages to be sent to this peer
230 struct GNUNET_MQ_Handle *mq;
233 * CADET transmit handle
235 struct GNUNET_CADET_TransmitHandle *th;
238 * Task used to try reconnection to collection point after failure
240 struct GNUNET_SCHEDULER_Task * reconnect_task;
243 * Are we currently destroying the channel and its context?
253 static const struct GNUNET_CONFIGURATION_Handle *cfg;
256 * Multihashmap of loaded sensors
258 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
261 * Handle to peerstore service
263 static struct GNUNET_PEERSTORE_Handle *peerstore;
266 * Handle to core service
268 static struct GNUNET_CORE_Handle *core;
271 * Handle to CADET service
273 static struct GNUNET_CADET_Handle *cadet;
278 static struct GNUNET_PeerIdentity mypeerid;
283 static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
286 * Head of DLL of anomaly info structs
288 static struct AnomalyInfo *ai_head;
291 * Tail of DLL of anomaly info structs
293 static struct AnomalyInfo *ai_tail;
296 * Head of DLL of value info structs
298 static struct ValueInfo *vi_head;
301 * Tail of DLL of value info structs
303 static struct ValueInfo *vi_tail;
306 * Head of DLL of CORE peers
308 static struct CorePeer *corep_head;
311 * Tail of DLL of CORE peers
313 static struct CorePeer *corep_tail;
316 * Head of DLL of CADET peers
318 static struct CadetPeer *cadetp_head;
321 * Tail of DLL of CADET peers
323 static struct CadetPeer *cadetp_tail;
326 * Is the module started?
328 static int module_running = GNUNET_NO;
331 * Number of known neighborhood peers
333 static int neighborhood;
336 * Parameter that defines the complexity of the proof-of-work
338 static long long unsigned int pow_matching_bits;
343 * Try reconnecting to collection point and send last queued message
346 cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
349 /******************************************************************************/
350 /****************************** CLEANUP ******************************/
351 /******************************************************************************/
354 * Destroy anomaly info struct
356 * @param ai struct to destroy
359 destroy_anomaly_info (struct AnomalyInfo *ai)
361 struct AnomalyReportingQueueItem *ar_item;
363 ar_item = ai->reporting_queue_head;
364 while (NULL != ar_item)
366 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
367 ai->reporting_queue_tail, ar_item);
368 GNUNET_free (ar_item);
369 ar_item = ai->reporting_queue_head;
371 if (NULL != ai->report_creation_cx)
373 GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
374 ai->report_creation_cx = NULL;
376 if (NULL != ai->report_block)
378 GNUNET_free (ai->report_block);
379 ai->report_block = NULL;
381 if (NULL != ai->anomalous_neighbors)
383 GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
384 ai->anomalous_neighbors = NULL;
391 * Destroy value info struct
393 * @param vi struct to destroy
396 destroy_value_info (struct ValueInfo *vi)
400 GNUNET_PEERSTORE_watch_cancel (vi->wc);
403 if (NULL != vi->reporting_task)
405 GNUNET_SCHEDULER_cancel (vi->reporting_task);
406 vi->reporting_task = NULL;
408 if (NULL != vi->last_value)
410 GNUNET_free (vi->last_value);
411 vi->last_value = NULL;
418 * Destroy core peer struct
420 * @param corep struct to destroy
423 destroy_core_peer (struct CorePeer *corep)
425 struct AnomalyInfo *ai;
426 struct AnomalyReportingQueueItem *ar_item;
431 GNUNET_assert (NULL != ai->anomalous_neighbors);
432 GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
434 /* Remove the core peer from any reporting queues */
435 ar_item = ai->reporting_queue_head;
436 while (NULL != ar_item)
438 if (ar_item->dest_mq == corep->mq)
440 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
441 ai->reporting_queue_tail, ar_item);
444 ar_item = ar_item->next;
448 if (NULL != corep->mq)
450 GNUNET_MQ_destroy (corep->mq);
458 * Destroy cadet peer struct
460 * @param cadetp struct to destroy
463 destroy_cadet_peer (struct CadetPeer *cadetp)
465 cadetp->destroying = GNUNET_YES;
466 if (NULL != cadetp->reconnect_task)
468 GNUNET_SCHEDULER_cancel (cadetp->reconnect_task);
469 cadetp->reconnect_task = NULL;
471 if (NULL != cadetp->mq)
473 GNUNET_MQ_destroy (cadetp->mq);
476 if (NULL != cadetp->channel)
478 GNUNET_CADET_channel_destroy (cadetp->channel);
479 cadetp->channel = NULL;
481 GNUNET_free (cadetp);
486 * Stop sensor reporting module
489 SENSOR_reporting_stop ()
491 struct ValueInfo *vi;
492 struct CorePeer *corep;
493 struct AnomalyInfo *ai;
494 struct CadetPeer *cadetp;
496 LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
497 module_running = GNUNET_NO;
499 /* Destroy value info's */
503 GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
504 destroy_value_info (vi);
507 /* Destroy core peers */
509 while (NULL != corep)
511 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
512 destroy_core_peer (corep);
515 /* Destroy anomaly info's */
519 GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
520 destroy_anomaly_info (ai);
523 /* Destroy cadet peers */
524 cadetp = cadetp_head;
525 while (NULL != cadetp)
527 GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
528 destroy_cadet_peer (cadetp);
529 cadetp = cadetp_head;
531 /* Disconnect from other services */
534 GNUNET_CORE_disconnect (core);
537 if (NULL != peerstore)
539 GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
544 GNUNET_CADET_disconnect (cadet);
550 /******************************************************************************/
551 /****************************** HELPERS ******************************/
552 /******************************************************************************/
556 * Gets the anomaly info struct related to the given sensor
558 * @param sensor Sensor to search by
560 static struct AnomalyInfo *
561 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
563 struct AnomalyInfo *ai;
568 if (ai->sensor == sensor)
579 * Function called to notify a client about the connection
580 * begin ready to queue more data. "buf" will be
581 * NULL and "size" zero if the connection was closed for
582 * writing in the meantime.
585 * @param size number of bytes available in buf
586 * @param buf where the callee should write the message
587 * @return number of bytes written to buf
590 cp_mq_ntr (void *cls, size_t size, void *buf)
592 struct CadetPeer *cadetp = cls;
593 const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (cadetp->mq);
596 LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_ntr()\n");
600 LOG (GNUNET_ERROR_TYPE_INFO,
601 "Sending anomaly report to collection point failed."
602 " Retrying connection in %s.\n",
603 GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
604 cadetp->reconnect_task =
605 GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
608 msize = ntohs (msg->size);
609 GNUNET_assert (msize <= size);
610 memcpy (buf, msg, msize);
611 GNUNET_MQ_impl_send_continue (cadetp->mq);
617 * Try reconnecting to collection point and send last queued message
620 cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
622 struct CadetPeer *cadetp = cls;
623 const struct GNUNET_MessageHeader *msg;
625 LOG (GNUNET_ERROR_TYPE_INFO,
626 "Retrying connection to collection point `%s'.\n",
627 GNUNET_i2s (&cadetp->peer_id));
628 cadetp->reconnect_task = NULL;
629 GNUNET_assert (NULL == cadetp->channel);
631 GNUNET_CADET_channel_create (cadet, cadetp, &cadetp->peer_id,
632 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
633 GNUNET_CADET_OPTION_RELIABLE);
634 msg = GNUNET_MQ_impl_current (cadetp->mq);
636 GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
637 GNUNET_TIME_UNIT_FOREVER_REL,
638 ntohs (msg->size), cp_mq_ntr, cadetp);
643 * Signature of functions implementing the
644 * sending functionality of a message queue.
646 * @param mq the message queue
647 * @param msg the message to send
648 * @param impl_state state of the implementation
651 cp_mq_send_impl (struct GNUNET_MQ_Handle *mq,
652 const struct GNUNET_MessageHeader *msg, void *impl_state)
654 struct CadetPeer *cadetp = impl_state;
656 LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_send_impl()\n");
657 GNUNET_assert (NULL == cadetp->th);
658 if (NULL == cadetp->channel)
660 LOG (GNUNET_ERROR_TYPE_INFO,
661 "Sending anomaly report to collection point failed."
662 " Retrying connection in %s.\n",
663 GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
664 cadetp->reconnect_task =
665 GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
669 GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
670 GNUNET_TIME_UNIT_FOREVER_REL,
671 ntohs (msg->size), cp_mq_ntr, cadetp);
676 * Signature of functions implementing the
677 * destruction of a message queue.
678 * Implementations must not free 'mq', but should
679 * take care of 'impl_state'.
681 * @param mq the message queue to destroy
682 * @param impl_state state of the implementation
685 cp_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
687 struct CadetPeer *cp = impl_state;
689 LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_destroy_impl()\n");
692 GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
699 * Create the message queue used to send messages to a collection point.
700 * This will be used to make sure that the message are queued even if the
701 * connection to the collection point can not be established at the moment.
703 * @param cp CadetPeer information struct
704 * @return Message queue handle
706 static struct GNUNET_MQ_Handle *
707 cp_mq_create (struct CadetPeer *cp)
709 return GNUNET_MQ_queue_for_callbacks (cp_mq_send_impl, cp_mq_destroy_impl,
710 NULL, cp, NULL, NULL, NULL);
715 * Returns context of a connected CADET peer.
716 * Creates it first if didn't exist before.
718 * @param pid Peer Identity
719 * @return Context of connected CADET peer
721 static struct CadetPeer *
722 get_cadet_peer (struct GNUNET_PeerIdentity pid)
724 struct CadetPeer *cadetp;
726 cadetp = cadetp_head;
727 while (NULL != cadetp)
729 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
731 cadetp = cadetp->next;
733 LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
735 /* Not found, create struct and channel */
736 cadetp = GNUNET_new (struct CadetPeer);
737 cadetp->peer_id = pid;
739 GNUNET_CADET_channel_create (cadet, cadetp, &pid,
740 GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
741 GNUNET_CADET_OPTION_RELIABLE);
742 cadetp->mq = cp_mq_create (cadetp);
743 cadetp->reconnect_task = NULL;
744 GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
750 * This function is called only when we have a block ready and want to send it
751 * to the given peer (represented by its message queue)
753 * @param mq Message queue to put the message in
754 * @param ai Anomaly info to report
755 * @param type Message type
758 do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
761 struct GNUNET_MessageHeader *msg;
762 struct GNUNET_MQ_Envelope *ev;
765 GNUNET_assert (NULL != ai->report_block);
767 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
768 ai->report_block->msg_size;
769 ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
770 memcpy (&msg[1], ai->report_block, block_size);
771 GNUNET_MQ_send (mq, ev);
776 * Check if we have signed and proof-of-work block ready.
777 * If yes, we send the report directly, if no, we enqueue the reporting until
778 * the block is ready.
780 * @param mq Message queue to put the message in
781 * @param ai Anomaly info to report
782 * @param p2p Is the report sent to a neighboring peer
785 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
788 struct AnomalyReportingQueueItem *ar_item;
793 p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
794 GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
795 if (NULL == ai->report_block)
797 ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
799 ar_item->dest_mq = mq;
800 ar_item->type = type;
801 GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
802 ai->reporting_queue_tail, ar_item);
806 do_send_anomaly_report (mq, ai, type);
812 * Callback when the crypto module finished created proof-of-work and signature
813 * for an anomaly report.
815 * @param cls Closure, a `struct AnomalyInfo *`
816 * @param block The resulting block, NULL on error
819 report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
821 struct AnomalyInfo *ai = cls;
822 struct AnomalyReportingQueueItem *ar_item;
824 ai->report_creation_cx = NULL;
825 if (NULL != ai->report_block)
827 LOG (GNUNET_ERROR_TYPE_ERROR,
828 _("Double creation of proof-of-work, this should not happen.\n"));
833 LOG (GNUNET_ERROR_TYPE_ERROR,
834 _("Failed to create pow and signature block.\n"));
837 LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
839 GNUNET_memdup (block,
840 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
842 ar_item = ai->reporting_queue_head;
843 while (NULL != ar_item)
845 GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
846 ai->reporting_queue_tail, ar_item);
847 do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
848 GNUNET_free (ar_item);
849 ar_item = ai->reporting_queue_head;
855 * When a change to the anomaly info of a sensor is done, this function should
856 * be called to create the message, its proof-of-work and signuature ready to
857 * be sent to other peers or collection point.
859 * @param ai Anomaly Info struct
862 update_anomaly_report_pow_block (struct AnomalyInfo *ai)
864 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
865 struct GNUNET_TIME_Absolute timestamp;
867 LOG (GNUNET_ERROR_TYPE_DEBUG,
868 "Updating anomaly report POW block due to data change.\n");
869 if (NULL != ai->report_block)
871 GNUNET_free (ai->report_block);
872 ai->report_block = NULL;
874 if (NULL != ai->report_creation_cx)
876 /* If a creation is already running, cancel it because the data changed */
877 GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
878 ai->report_creation_cx = NULL;
880 arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
882 GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
883 &arm->sensorname_hash);
884 arm->sensorversion_major = htons (ai->sensor->version_major);
885 arm->sensorversion_minor = htons (ai->sensor->version_minor);
886 arm->anomalous = htons (ai->anomalous);
887 arm->anomalous_neighbors =
889 neighborhood) ? 0 : ((float)
890 GNUNET_CONTAINER_multipeermap_size
891 (ai->anomalous_neighbors)) / neighborhood;
892 timestamp = GNUNET_TIME_absolute_get ();
893 ai->report_creation_cx =
894 GNUNET_SENSOR_crypto_pow_sign (arm,
896 GNUNET_SENSOR_AnomalyReportMessage),
897 ×tamp, &mypeerid.public_key,
898 private_key, pow_matching_bits,
899 &report_creation_cb, ai);
905 * Create a sensor value message from a given value info struct inside a MQ
908 * @param vi Value info struct to use
909 * @return Envelope with message
911 static struct GNUNET_MQ_Envelope *
912 create_value_message (struct ValueInfo *vi)
914 struct GNUNET_SENSOR_ValueMessage *vm;
915 struct GNUNET_MQ_Envelope *ev;
917 ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
918 GNUNET_MESSAGE_TYPE_SENSOR_READING);
919 GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
920 &vm->sensorname_hash);
921 vm->sensorversion_major = htons (vi->sensor->version_major);
922 vm->sensorversion_minor = htons (vi->sensor->version_minor);
923 vm->timestamp = vi->last_value_timestamp;
924 vm->value_size = htons (vi->last_value_size);
925 memcpy (&vm[1], vi->last_value, vi->last_value_size);
930 /******************************************************************************/
931 /*************************** CORE Handlers ***************************/
932 /******************************************************************************/
936 * An inbound anomaly report is received from a peer through CORE.
938 * @param cls closure (unused)
939 * @param peer the other peer involved
940 * @param message the actual message
941 * @return #GNUNET_OK to keep the connection open,
942 * #GNUNET_SYSERR to close connection to the peer (signal serious error)
945 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
946 const struct GNUNET_MessageHeader *message)
948 struct GNUNET_SENSOR_crypto_pow_block *report_block;
949 struct GNUNET_SENSOR_AnomalyReportMessage *arm;
950 struct GNUNET_SENSOR_SensorInfo *sensor;
951 struct AnomalyInfo *my_anomaly_info;
952 struct CadetPeer *cadetp;
954 int peer_in_anomalous_list;
956 /* Verify proof-of-work, signature and extract report message */
957 report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
958 if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
959 GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
960 (struct GNUNET_CRYPTO_EddsaPublicKey
961 *) &other->public_key,
964 LOG (GNUNET_ERROR_TYPE_WARNING,
965 "Received invalid anomaly report from peer `%s'.\n",
968 return GNUNET_SYSERR;
970 /* Now we parse the content of the message */
971 sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
972 if (NULL == sensor ||
973 sensor->version_major != ntohs (arm->sensorversion_major) ||
974 sensor->version_minor != ntohs (arm->sensorversion_minor))
976 LOG (GNUNET_ERROR_TYPE_WARNING,
977 "I don't have the sensor reported by the peer `%s'.\n",
981 my_anomaly_info = get_anomaly_info_by_sensor (sensor);
982 GNUNET_assert (NULL != my_anomaly_info);
983 peer_in_anomalous_list =
984 GNUNET_CONTAINER_multipeermap_contains
985 (my_anomaly_info->anomalous_neighbors, other);
986 peer_anomalous = ntohs (arm->anomalous);
987 LOG (GNUNET_ERROR_TYPE_DEBUG,
988 "Received an anomaly update from neighbour `%s' (%d).\n",
989 GNUNET_i2s (other), peer_anomalous);
990 if (GNUNET_YES == peer_anomalous)
992 if (GNUNET_YES == peer_in_anomalous_list) /* repeated positive report */
995 GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
997 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1001 if (GNUNET_NO == peer_in_anomalous_list) /* repeated negative report */
1002 GNUNET_break_op (0);
1004 GNUNET_CONTAINER_multipeermap_remove_all
1005 (my_anomaly_info->anomalous_neighbors, other);
1007 /* This is important to create an updated block since the data changed */
1008 update_anomaly_report_pow_block (my_anomaly_info);
1009 /* Send anomaly update to collection point only if I have the same anomaly */
1010 if (GNUNET_YES == my_anomaly_info->anomalous &&
1011 NULL != sensor->collection_point &&
1012 GNUNET_YES == sensor->report_anomalies)
1014 LOG (GNUNET_ERROR_TYPE_DEBUG,
1015 "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
1016 GNUNET_i2s (sensor->collection_point));
1017 cadetp = get_cadet_peer (*sensor->collection_point);
1018 send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
1024 /******************************************************************************/
1025 /************************ PEERSTORE callbacks ************************/
1026 /******************************************************************************/
1030 * Sensor value watch callback
1032 * @param cls Closure, ValueInfo struct related to the sensor we are watching
1033 * @param record PEERSTORE new record, NULL if error
1034 * @param emsg Error message, NULL if no error
1035 * @return #GNUNET_YES to continue watching
1038 value_watch_cb (void *cls,
1039 const struct GNUNET_PEERSTORE_Record *record,
1042 struct ValueInfo *vi = cls;
1046 LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
1049 if (NULL != vi->last_value)
1051 GNUNET_free (vi->last_value);
1052 vi->last_value_size = 0;
1054 vi->last_value = GNUNET_memdup (record->value, record->value_size);
1055 vi->last_value_size = record->value_size;
1056 vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
1057 vi->last_value_reported = GNUNET_NO;
1062 /******************************************************************************/
1063 /************************** CORE callbacks ***************************/
1064 /******************************************************************************/
1068 * Method called whenever a CORE peer disconnects.
1070 * @param cls closure (unused)
1071 * @param peer peer identity this notification is about
1074 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
1076 struct CorePeer *corep;
1078 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
1080 LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
1084 while (NULL != corep)
1086 if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
1088 GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
1089 destroy_core_peer (corep);
1092 corep = corep->next;
1098 * Method called whenever a given peer connects through CORE.
1100 * @param cls closure (unused)
1101 * @param peer peer identity this notification is about
1104 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
1106 struct CorePeer *corep;
1107 struct AnomalyInfo *ai;
1109 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
1111 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
1114 corep = GNUNET_new (struct CorePeer);
1115 corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
1116 corep->mq = GNUNET_CORE_mq_create (core, peer);
1117 GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
1118 /* Send any locally anomalous sensors to the new peer */
1122 if (GNUNET_YES == ai->anomalous)
1124 LOG (GNUNET_ERROR_TYPE_DEBUG,
1125 "Updating newly connected neighbor `%s' with anomalous sensor.\n",
1127 send_anomaly_report (corep->mq, ai, GNUNET_YES);
1135 * Function called after #GNUNET_CORE_connect has succeeded (or failed
1136 * for good). Note that the private key of the peer is intentionally
1137 * not exposed here; if you need it, your process should try to read
1138 * the private key file directly (which should work if you are
1139 * authorized...). Implementations of this function must not call
1140 * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1143 * @param cls closure (unused)
1144 * @param my_identity ID of this peer, NULL if we failed
1147 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1149 if (NULL == my_identity)
1151 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1152 SENSOR_reporting_stop ();
1155 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
1157 LOG (GNUNET_ERROR_TYPE_ERROR,
1158 _("Peer identity received from CORE init doesn't match ours.\n"));
1159 SENSOR_reporting_stop ();
1165 /******************************************************************************/
1166 /************************* CADET callbacks ***************************/
1167 /******************************************************************************/
1170 * Function called whenever a channel is destroyed. Should clean up
1171 * any associated state.
1173 * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
1175 * @param cls closure (set from #GNUNET_CADET_connect)
1176 * @param channel connection to the other end (henceforth invalid)
1177 * @param channel_ctx place where local state associated
1178 * with the channel is stored
1181 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
1184 struct CadetPeer *cadetp = channel_ctx;
1186 if (GNUNET_YES == cadetp->destroying)
1188 LOG (GNUNET_ERROR_TYPE_DEBUG,
1189 "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1190 GNUNET_i2s (&cadetp->peer_id));
1191 if (NULL != cadetp->th)
1193 GNUNET_CADET_notify_transmit_ready_cancel (cadetp->th);
1196 cadetp->channel = NULL;
1200 /******************************************************************************/
1201 /********************** Local anomaly receiver ***********************/
1202 /******************************************************************************/
1206 * Used by the analysis module to tell the reporting module about a change in
1207 * the anomaly status of a sensor.
1209 * @param sensor Related sensor
1210 * @param anomalous The new sensor anomalous status
1213 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
1216 struct AnomalyInfo *ai;
1217 struct CorePeer *corep;
1218 struct CadetPeer *cadetp;
1220 if (GNUNET_NO == module_running)
1222 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
1223 ai = get_anomaly_info_by_sensor (sensor);
1224 GNUNET_assert (NULL != ai);
1225 ai->anomalous = anomalous;
1226 /* This is important to create an updated block since the data changed */
1227 update_anomaly_report_pow_block (ai);
1228 /* Report change to all neighbors */
1230 while (NULL != corep)
1232 LOG (GNUNET_ERROR_TYPE_DEBUG,
1233 "Sending an anomaly report to neighbor `%s'.\n",
1234 GNUNET_i2s (corep->peer_id));
1235 send_anomaly_report (corep->mq, ai, GNUNET_YES);
1236 corep = corep->next;
1238 /* Report change to collection point if need */
1239 if (NULL != ai->sensor->collection_point &&
1240 GNUNET_YES == ai->sensor->report_anomalies)
1242 LOG (GNUNET_ERROR_TYPE_DEBUG,
1243 "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
1244 GNUNET_i2s (ai->sensor->collection_point));
1245 cadetp = get_cadet_peer (*ai->sensor->collection_point);
1246 send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
1251 /******************************************************************************/
1252 /******************* Reporting values (periodic) *********************/
1253 /******************************************************************************/
1257 * Task scheduled to send values to collection point
1259 * @param cls closure, a `struct ValueReportingContext *`
1263 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1265 struct ValueInfo *vi = cls;
1266 struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
1267 struct CadetPeer *cadetp;
1268 struct GNUNET_MQ_Envelope *ev;
1270 vi->reporting_task =
1271 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1273 if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
1275 LOG (GNUNET_ERROR_TYPE_WARNING,
1276 "Did not receive a fresh value from `%s' to report.\n", sensor->name);
1279 LOG (GNUNET_ERROR_TYPE_DEBUG,
1280 "Now trying to report last seen value of `%s' to collection point.\n",
1282 cadetp = get_cadet_peer (*sensor->collection_point);
1283 if (NULL == cadetp->channel)
1285 LOG (GNUNET_ERROR_TYPE_WARNING,
1286 "Trying to send value to collection point but connection failed, discarding.\n");
1289 ev = create_value_message (vi);
1290 GNUNET_MQ_send (cadetp->mq, ev);
1291 vi->last_value_reported = GNUNET_YES;
1295 /******************************************************************************/
1296 /******************************** INIT *******************************/
1297 /******************************************************************************/
1301 * Iterator for defined sensors and creates anomaly info context
1305 * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
1306 * @return #GNUNET_YES to continue iterations
1309 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
1312 struct GNUNET_SENSOR_SensorInfo *sensor = value;
1313 struct AnomalyInfo *ai;
1314 struct ValueInfo *vi;
1316 /* Create sensor anomaly info context */
1317 ai = GNUNET_new (struct AnomalyInfo);
1319 ai->sensor = sensor;
1320 ai->anomalous = GNUNET_NO;
1321 ai->anomalous_neighbors =
1322 GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1323 ai->report_block = NULL;
1324 ai->report_creation_cx = NULL;
1325 GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
1326 /* Create sensor value info context (if needed to be reported) */
1327 if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
1329 LOG (GNUNET_ERROR_TYPE_INFO,
1330 "Reporting sensor `%s' values to collection point `%s' every %s.\n",
1331 sensor->name, GNUNET_i2s_full (sensor->collection_point),
1332 GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
1334 vi = GNUNET_new (struct ValueInfo);
1335 vi->sensor = sensor;
1336 vi->last_value = NULL;
1337 vi->last_value_size = 0;
1338 vi->last_value_reported = GNUNET_NO;
1340 GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
1341 &value_watch_cb, vi);
1342 vi->reporting_task =
1343 GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1345 GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
1351 * Start the sensor anomaly reporting module
1353 * @param c our service configuration
1354 * @param s multihashmap of loaded sensors
1355 * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
1358 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
1359 struct GNUNET_CONTAINER_MultiHashMap *s)
1361 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1362 {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
1363 sizeof (struct GNUNET_MessageHeader) +
1364 sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
1365 sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
1368 static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1372 LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
1373 GNUNET_assert (NULL != s);
1377 GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
1378 "POW_MATCHING_BITS",
1379 &pow_matching_bits))
1381 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
1382 "POW_MATCHING_BITS");
1383 SENSOR_reporting_stop ();
1384 return GNUNET_SYSERR;
1386 if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
1388 LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
1389 pow_matching_bits, sizeof (struct GNUNET_HashCode));
1390 SENSOR_reporting_stop ();
1391 return GNUNET_SYSERR;
1393 /* Connect to PEERSTORE */
1394 peerstore = GNUNET_PEERSTORE_connect (cfg);
1395 if (NULL == peerstore)
1397 LOG (GNUNET_ERROR_TYPE_ERROR,
1398 _("Failed to connect to peerstore service.\n"));
1399 SENSOR_reporting_stop ();
1400 return GNUNET_SYSERR;
1402 /* Connect to CORE */
1404 GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
1405 &core_disconnect_cb, NULL, GNUNET_YES, NULL,
1406 GNUNET_YES, core_handlers);
1409 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1410 SENSOR_reporting_stop ();
1411 return GNUNET_SYSERR;
1413 /* Connect to CADET */
1415 GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1416 cadet_handlers, NULL);
1419 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1420 SENSOR_reporting_stop ();
1421 return GNUNET_SYSERR;
1423 private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1424 if (NULL == private_key)
1426 LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
1427 SENSOR_reporting_stop ();
1428 return GNUNET_SYSERR;
1430 GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1431 GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1433 module_running = GNUNET_YES;
1437 /* end of gnunet-service-sensor_reporting.c */