sensor: separate monitoring functions
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36
37 struct AnomalyInfo
38 {
39
40   /**
41    * DLL
42    */
43   struct AnomalyInfo *prev;
44
45   /**
46    * DLL
47    */
48   struct AnomalyInfo *next;
49
50   /**
51    * Sensor information
52    */
53   struct GNUNET_SENSOR_SensorInfo *sensor;
54
55   /**
56    * Current anomalous status of sensor
57    */
58   int anomalous;
59
60   /**
61    * List of peers that reported an anomaly for this sensor
62    */
63   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
64
65 };
66
67 struct ValueInfo
68 {
69
70   /**
71    * DLL
72    */
73   struct ValueInfo *prev;
74
75   /**
76    * DLL
77    */
78   struct ValueInfo *next;
79
80   /**
81    * Sensor information
82    */
83   struct GNUNET_SENSOR_SensorInfo *sensor;
84
85   /**
86    * Last value read from sensor
87    */
88   void *last_value;
89
90   /**
91    * Size of @e last_value
92    */
93   size_t last_value_size;
94
95   /**
96    * Timestamp of last value reading
97    */
98   struct GNUNET_TIME_Absolute last_value_timestamp;
99
100   /**
101    * Has the last value seen already been reported to collection point?
102    */
103   int last_value_reported;
104
105   /**
106    * Watcher of sensor values
107    */
108   struct GNUNET_PEERSTORE_WatchContext *wc;
109
110   /**
111    * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
112    */
113   GNUNET_SCHEDULER_TaskIdentifier reporting_task;
114
115 };
116
117 /**
118  * Information about a connected CORE peer.
119  * Note that we only know about a connected peer if it is running the same
120  * application (sensor anomaly reporting) as us.
121  */
122 struct CorePeer
123 {
124
125   /**
126    * DLL
127    */
128   struct CorePeer *prev;
129
130   /**
131    * DLL
132    */
133   struct CorePeer *next;
134
135   /**
136    * Peer identity of connected peer
137    */
138   struct GNUNET_PeerIdentity *peer_id;
139
140   /**
141    * Message queue for messages to be sent to this peer
142    */
143   struct GNUNET_MQ_Handle *mq;
144
145 };
146
147 /**
148  * Information about a connected CADET peer (collection point).
149  */
150 struct CadetPeer
151 {
152
153   /**
154    * DLL
155    */
156   struct CadetPeer *prev;
157
158   /**
159    * DLL
160    */
161   struct CadetPeer *next;
162
163   /**
164    * Peer Identity
165    */
166   struct GNUNET_PeerIdentity peer_id;
167
168   /**
169    * CADET channel handle
170    */
171   struct GNUNET_CADET_Channel *channel;
172
173   /**
174    * Message queue for messages to be sent to this peer
175    */
176   struct GNUNET_MQ_Handle *mq;
177
178   /**
179    * Are we currently destroying the channel and its context?
180    */
181   int destroying;
182
183 };
184
185
186 /**
187  * Our configuration.
188  */
189 static const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191 /**
192  * Multihashmap of loaded sensors
193  */
194 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
195
196 /**
197  * Handle to peerstore service
198  */
199 static struct GNUNET_PEERSTORE_Handle *peerstore;
200
201 /**
202  * Handle to core service
203  */
204 static struct GNUNET_CORE_Handle *core;
205
206 /**
207  * Handle to CADET service
208  */
209 static struct GNUNET_CADET_Handle *cadet;
210
211 /**
212  * My peer id
213  */
214 static struct GNUNET_PeerIdentity mypeerid;
215
216 /**
217  * Head of DLL of anomaly info structs
218  */
219 static struct AnomalyInfo *ai_head;
220
221 /**
222  * Tail of DLL of anomaly info structs
223  */
224 static struct AnomalyInfo *ai_tail;
225
226 /**
227  * Head of DLL of value info structs
228  */
229 static struct ValueInfo *vi_head;
230
231 /**
232  * Tail of DLL of value info structs
233  */
234 static struct ValueInfo *vi_tail;
235
236 /**
237  * Head of DLL of CORE peers
238  */
239 static struct CorePeer *corep_head;
240
241 /**
242  * Tail of DLL of CORE peers
243  */
244 static struct CorePeer *corep_tail;
245
246 /**
247  * Head of DLL of CADET peers
248  */
249 static struct CadetPeer *cadetp_head;
250
251 /**
252  * Tail of DLL of CADET peers
253  */
254 static struct CadetPeer *cadetp_tail;
255
256 /**
257  * Is the module started?
258  */
259 static int module_running = GNUNET_NO;
260
261 /**
262  * Number of known neighborhood peers
263  */
264 static int neighborhood;
265
266
267
268 /******************************************************************************/
269 /******************************      CLEANUP     ******************************/
270 /******************************************************************************/
271
272 /**
273  * Destroy anomaly info struct
274  *
275  * @param ai struct to destroy
276  */
277 static void
278 destroy_anomaly_info (struct AnomalyInfo *ai)
279 {
280   if (NULL != ai->anomalous_neighbors)
281     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
282   GNUNET_free (ai);
283 }
284
285
286 /**
287  * Destroy value info struct
288  *
289  * @param vi struct to destroy
290  */
291 static void
292 destroy_value_info (struct ValueInfo *vi)
293 {
294   if (NULL != vi->wc)
295   {
296     GNUNET_PEERSTORE_watch_cancel (vi->wc);
297     vi->wc = NULL;
298   }
299   if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
300   {
301     GNUNET_SCHEDULER_cancel (vi->reporting_task);
302     vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
303   }
304   if (NULL != vi->last_value)
305   {
306     GNUNET_free (vi->last_value);
307     vi->last_value = NULL;
308   }
309   GNUNET_free (vi);
310 }
311
312
313 /**
314  * Destroy core peer struct
315  *
316  * @param corep struct to destroy
317  */
318 static void
319 destroy_core_peer (struct CorePeer *corep)
320 {
321   struct AnomalyInfo *ai;
322
323   if (NULL != corep->mq)
324   {
325     GNUNET_MQ_destroy (corep->mq);
326     corep->mq = NULL;
327   }
328   ai = ai_head;
329   while (NULL != ai)
330   {
331     GNUNET_assert (NULL != ai->anomalous_neighbors);
332     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
333                                               corep->peer_id);
334     ai = ai->next;
335   }
336   GNUNET_free (corep);
337 }
338
339
340 /**
341  * Destroy cadet peer struct
342  *
343  * @param cadetp struct to destroy
344  */
345 static void
346 destroy_cadet_peer (struct CadetPeer *cadetp)
347 {
348   cadetp->destroying = GNUNET_YES;
349   if (NULL != cadetp->mq)
350   {
351     GNUNET_MQ_destroy (cadetp->mq);
352     cadetp->mq = NULL;
353   }
354   if (NULL != cadetp->channel)
355   {
356     GNUNET_CADET_channel_destroy (cadetp->channel);
357     cadetp->channel = NULL;
358   }
359   GNUNET_free (cadetp);
360 }
361
362
363 /**
364  * Stop sensor reporting module
365  */
366 void
367 SENSOR_reporting_stop ()
368 {
369   struct ValueInfo *vi;
370   struct CorePeer *corep;
371   struct AnomalyInfo *ai;
372   struct CadetPeer *cadetp;
373
374   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
375   module_running = GNUNET_NO;
376   neighborhood = 0;
377   /* Destroy value info's */
378   vi = vi_head;
379   while (NULL != vi)
380   {
381     GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
382     destroy_value_info (vi);
383     vi = vi_head;
384   }
385   /* Destroy core peers */
386   corep = corep_head;
387   while (NULL != corep)
388   {
389     GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
390     destroy_core_peer (corep);
391     corep = corep_head;
392   }
393   /* Destroy anomaly info's */
394   ai = ai_head;
395   while (NULL != ai)
396   {
397     GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
398     destroy_anomaly_info (ai);
399     ai = ai_head;
400   }
401   /* Destroy cadet peers */
402   cadetp = cadetp_head;
403   while (NULL != cadetp)
404   {
405     GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
406     destroy_cadet_peer (cadetp);
407     cadetp = cadetp_head;
408   }
409   /* Disconnect from other services */
410   if (NULL != core)
411   {
412     GNUNET_CORE_disconnect (core);
413     core = NULL;
414   }
415   if (NULL != peerstore)
416   {
417     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
418     peerstore = NULL;
419   }
420   if (NULL != cadet)
421   {
422     GNUNET_CADET_disconnect (cadet);
423     cadet = NULL;
424   }
425 }
426
427
428 /******************************************************************************/
429 /******************************      HELPERS     ******************************/
430 /******************************************************************************/
431
432
433 /**
434  * Gets the anomaly info struct related to the given sensor
435  *
436  * @param sensor Sensor to search by
437  */
438 static struct AnomalyInfo *
439 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
440 {
441   struct AnomalyInfo *ai;
442
443   ai = ai_head;
444   while (NULL != ai)
445   {
446     if (ai->sensor == sensor)
447     {
448       return ai;
449     }
450     ai = ai->next;
451   }
452   return NULL;
453 }
454
455
456 /**
457  * Returns context of a connected CADET peer.
458  * Creates it first if didn't exist before.
459  *
460  * @param pid Peer Identity
461  * @return Context of connected CADET peer
462  */
463 static struct CadetPeer *
464 get_cadet_peer (struct GNUNET_PeerIdentity pid)
465 {
466   struct CadetPeer *cadetp;
467
468   cadetp = cadetp_head;
469   while (NULL != cadetp)
470   {
471     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
472       return cadetp;
473     cadetp = cadetp->next;
474   }
475   /* Not found, create struct and channel */
476   cadetp = GNUNET_new (struct CadetPeer);
477   cadetp->peer_id = pid;
478   cadetp->channel =
479       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
480                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
481                                    GNUNET_CADET_OPTION_DEFAULT);
482   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
483   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
484   return cadetp;
485 }
486
487
488 /**
489  * Create an anomaly report message from a given anomaly info struct inside a
490  * MQ envelope.
491  *
492  * @param ai Anomaly info struct to use
493  * @return Envelope with message
494  */
495 static struct GNUNET_MQ_Envelope *
496 create_anomaly_report_message (struct AnomalyInfo *ai)
497 {
498   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
499   struct GNUNET_MQ_Envelope *ev;
500
501   ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
502   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
503                       &arm->sensorname_hash);
504   arm->sensorversion_major = htons (ai->sensor->version_major);
505   arm->sensorversion_minor = htons (ai->sensor->version_minor);
506   arm->anomalous = htons (ai->anomalous);
507   arm->anomalous_neighbors =
508       ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
509       neighborhood;
510   return ev;
511 }
512
513
514 /**
515  * Create a sensor value message from a given value info struct inside a MQ
516  * envelope.
517  *
518  * @param vi Value info struct to use
519  * @return Envelope with message
520  */
521 static struct GNUNET_MQ_Envelope *
522 create_value_message (struct ValueInfo *vi)
523 {
524   struct GNUNET_SENSOR_ValueMessage *vm;
525   struct GNUNET_MQ_Envelope *ev;
526
527   ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING);
528   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
529                       &vm->sensorname_hash);
530   vm->sensorversion_major = htons (vi->sensor->version_major);
531   vm->sensorversion_minor = htons (vi->sensor->version_minor);
532   vm->timestamp = vi->last_value_timestamp;
533   vm->value_size = htons (vi->last_value_size);
534   memcpy (&vm[1], vi->last_value, vi->last_value_size);
535   return ev;
536 }
537
538
539 /**
540  * Send given anomaly info report by putting it in the given message queue.
541  *
542  * @param mq Message queue to put the message in
543  * @param ai Anomaly info to report
544  */
545 static void
546 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai)
547 {
548   struct GNUNET_MQ_Envelope *ev;
549
550   ev = create_anomaly_report_message (ai);
551   GNUNET_MQ_send (mq, ev);
552 }
553
554
555 /******************************************************************************/
556 /***************************      CORE Handlers     ***************************/
557 /******************************************************************************/
558
559
560 /**
561  * An inbound anomaly report is received from a peer through CORE.
562  *
563  * @param cls closure (unused)
564  * @param peer the other peer involved
565  * @param message the actual message
566  * @return #GNUNET_OK to keep the connection open,
567  *         #GNUNET_SYSERR to close connection to the peer (signal serious error)
568  */
569 static int
570 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
571                        const struct GNUNET_MessageHeader *message)
572 {
573   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
574   struct GNUNET_SENSOR_SensorInfo *sensor;
575   struct AnomalyInfo *ai;
576   struct CadetPeer *cadetp;
577   int peer_in_list;
578
579   arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
580   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
581   if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
582       sensor->version_minor != arm->sensorversion_minor)
583   {
584     LOG (GNUNET_ERROR_TYPE_WARNING,
585          "I don't have the sensor reported by the peer `%s'.\n",
586          GNUNET_i2s (other));
587     return GNUNET_OK;
588   }
589   ai = get_anomaly_info_by_sensor (sensor);
590   GNUNET_assert (NULL != ai);
591   peer_in_list =
592       GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
593   if (GNUNET_YES == ai->anomalous)
594   {
595     if (GNUNET_YES == peer_in_list)
596       GNUNET_break_op (0);
597     else
598       GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
599                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
600   }
601   else
602   {
603     if (GNUNET_NO == peer_in_list)
604       GNUNET_break_op (0);
605     else
606       GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
607   }
608   /* Send anomaly update to collection point */
609   if (NULL != ai->sensor->collection_point &&
610         GNUNET_YES == ai->sensor->report_anomalies)
611   {
612     cadetp = get_cadet_peer (*ai->sensor->collection_point);
613     send_anomaly_report (cadetp->mq, ai);
614   }
615   return GNUNET_OK;
616 }
617
618
619 /******************************************************************************/
620 /************************      PEERSTORE callbacks     ************************/
621 /******************************************************************************/
622
623
624 /**
625  * Sensor value watch callback
626  *
627  * @param cls Closure, ValueInfo struct related to the sensor we are watching
628  * @param record PEERSTORE new record, NULL if error
629  * @param emsg Error message, NULL if no error
630  * @return GNUNET_YES to continue watching
631  */
632 static int
633 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
634 {
635   struct ValueInfo *vi = cls;
636
637   if (NULL != emsg)
638   {
639     LOG (GNUNET_ERROR_TYPE_ERROR,
640         _("PEERSTORE error: %s.\n"), emsg);
641     return GNUNET_YES;
642   }
643   if (NULL != vi->last_value)
644   {
645     GNUNET_free (vi->last_value);
646     vi->last_value_size = 0;
647   }
648   vi->last_value = GNUNET_memdup (record->value, record->value_size);
649   vi->last_value_size = record->value_size;
650   vi->last_value_timestamp = GNUNET_TIME_absolute_get();
651   vi->last_value_reported = GNUNET_NO;
652   return GNUNET_YES;
653 }
654
655
656 /******************************************************************************/
657 /**************************      CORE callbacks     ***************************/
658 /******************************************************************************/
659
660
661 /**
662  * Method called whenever a CORE peer disconnects.
663  *
664  * @param cls closure (unused)
665  * @param peer peer identity this notification is about
666  */
667 static void
668 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
669 {
670   struct CorePeer *corep;
671
672   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
673     return;
674   neighborhood--;
675   corep = corep_head;
676   while (NULL != corep)
677   {
678     if (peer == corep->peer_id)
679     {
680       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
681       destroy_core_peer (corep);
682       return;
683     }
684     corep = corep->next;
685   }
686   LOG (GNUNET_ERROR_TYPE_ERROR,
687        _("Received disconnect notification from CORE"
688          " for a peer we didn't know about.\n"));
689 }
690
691
692 /**
693  * Method called whenever a given peer connects through CORE.
694  *
695  * @param cls closure (unused)
696  * @param peer peer identity this notification is about
697  */
698 static void
699 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
700 {
701   struct CorePeer *corep;
702   struct AnomalyInfo *ai;
703
704   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
705     return;
706   neighborhood++;
707   corep = GNUNET_new (struct CorePeer);
708   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
709   corep->mq = GNUNET_CORE_mq_create (core, peer);
710   GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
711   /* Send any locally anomalous sensors to the new peer */
712   ai = ai_head;
713   while (NULL != ai)
714   {
715     if (GNUNET_YES == ai->anomalous)
716       send_anomaly_report (corep->mq, ai);
717     ai = ai->next;
718   }
719 }
720
721
722 /**
723  * Function called after #GNUNET_CORE_connect has succeeded (or failed
724  * for good).  Note that the private key of the peer is intentionally
725  * not exposed here; if you need it, your process should try to read
726  * the private key file directly (which should work if you are
727  * authorized...).  Implementations of this function must not call
728  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
729  * do this later).
730  *
731  * @param cls closure (unused)
732  * @param my_identity ID of this peer, NULL if we failed
733  */
734 static void
735 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
736 {
737   if (NULL == my_identity)
738   {
739     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
740     SENSOR_reporting_stop ();
741     return;
742   }
743   if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
744   {
745     LOG (GNUNET_ERROR_TYPE_ERROR,
746          _("Peer identity received from CORE init doesn't match ours.\n"));
747     SENSOR_reporting_stop ();
748     return;
749   }
750 }
751
752
753 /******************************************************************************/
754 /*************************      CADET callbacks     ***************************/
755 /******************************************************************************/
756
757 /**
758  * Function called whenever a channel is destroyed.  Should clean up
759  * any associated state.
760  *
761  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
762  *
763  * @param cls closure (set from #GNUNET_CADET_connect)
764  * @param channel connection to the other end (henceforth invalid)
765  * @param channel_ctx place where local state associated
766  *                   with the channel is stored
767  */
768 static void
769 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
770                          void *channel_ctx)
771 {
772   struct CadetPeer *cadetp = channel_ctx;
773
774   if (GNUNET_YES == cadetp->destroying)
775       return;
776   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
777   cadetp->channel = NULL;
778   destroy_cadet_peer (cadetp);
779 }
780
781
782 /******************************************************************************/
783 /**********************      Local anomaly receiver     ***********************/
784 /******************************************************************************/
785
786
787 /**
788  * Used by the analysis module to tell the reporting module about a change in
789  * the anomaly status of a sensor.
790  *
791  * @param sensor Related sensor
792  * @param anomalous The new sensor anomalous status
793  */
794 void
795 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
796                                  int anomalous)
797 {
798   struct AnomalyInfo *ai;
799   struct CorePeer *corep;
800   struct CadetPeer *cadetp;
801
802   if (GNUNET_NO == module_running)
803     return;
804   ai = get_anomaly_info_by_sensor (sensor);
805   GNUNET_assert (NULL != ai);
806   ai->anomalous = anomalous;
807   /* Report change to all neighbors */
808   corep = corep_head;
809   while (NULL != corep)
810   {
811     send_anomaly_report (corep->mq, ai);
812     corep = corep->next;
813   }
814   if (NULL != ai->sensor->collection_point &&
815       GNUNET_YES == ai->sensor->report_anomalies)
816   {
817     cadetp = get_cadet_peer (*ai->sensor->collection_point);
818     send_anomaly_report (cadetp->mq, ai);
819   }
820 }
821
822
823 /******************************************************************************/
824 /*******************      Reporting values (periodic)     *********************/
825 /******************************************************************************/
826
827
828 /**
829  * Task scheduled to send values to collection point
830  *
831  * @param cls closure, a `struct ValueReportingContext *`
832  * @param tc unused
833  */
834 static void
835 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
836 {
837   struct ValueInfo *vi = cls;
838   struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
839   struct CadetPeer *cadetp;
840   struct GNUNET_MQ_Envelope *ev;
841
842   vi->reporting_task =
843       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
844                                     &report_value, vi);
845   if (0 == vi->last_value_size ||
846       GNUNET_YES == vi->last_value_reported)
847   {
848     LOG (GNUNET_ERROR_TYPE_WARNING,
849          "Did not receive a fresh value from `%s' to report.\n",
850          sensor->name);
851     return;
852   }
853   LOG (GNUNET_ERROR_TYPE_DEBUG,
854        "Now trying to report last seen value of `%s' to collection point.\n",
855        sensor->name);
856   cadetp = get_cadet_peer (*sensor->collection_point);
857   ev = create_value_message (vi);
858   GNUNET_MQ_send (cadetp->mq, ev);
859   vi->last_value_reported = GNUNET_YES;
860 }
861
862
863 /******************************************************************************/
864 /********************************      INIT     *******************************/
865 /******************************************************************************/
866
867
868 /**
869  * Iterator for defined sensors and creates anomaly info context
870  *
871  * @param cls unused
872  * @param key unused
873  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
874  * @return #GNUNET_YES to continue iterations
875  */
876 static int
877 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
878                        void *value)
879 {
880   struct GNUNET_SENSOR_SensorInfo *sensor = value;
881   struct AnomalyInfo *ai;
882   struct ValueInfo *vi;
883
884   /* Create sensor anomaly info context */
885   ai = GNUNET_new (struct AnomalyInfo);
886   ai->sensor = sensor;
887   ai->anomalous = GNUNET_NO;
888   ai->anomalous_neighbors =
889       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
890   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
891   /* Create sensor value info context (if needed to be reported) */
892   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
893     return GNUNET_YES;
894   LOG (GNUNET_ERROR_TYPE_INFO,
895        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
896        sensor->name, GNUNET_i2s_full (sensor->collection_point),
897        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
898                                                GNUNET_YES));
899   vi = GNUNET_new (struct ValueInfo);
900   vi->sensor = sensor;
901   vi->last_value = NULL;
902   vi->last_value_size = 0;
903   vi->last_value_reported = GNUNET_NO;
904   vi->wc =
905     GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
906                             &value_watch_cb, vi);
907   vi->reporting_task =
908     GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
909                                   &report_value, vi);
910   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
911   return GNUNET_YES;
912 }
913
914
915 /**
916  * Start the sensor anomaly reporting module
917  *
918  * @param c our service configuration
919  * @param s multihashmap of loaded sensors
920  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
921  */
922 int
923 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
924                                 struct GNUNET_CONTAINER_MultiHashMap *s)
925 {
926   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
927     {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
928      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
929     {NULL, 0, 0}
930   };
931   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
932     {NULL, 0, 0}
933   };
934
935   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
936   GNUNET_assert (NULL != s);
937   sensors = s;
938   cfg = c;
939   /* Connect to PEERSTORE */
940   peerstore = GNUNET_PEERSTORE_connect (cfg);
941   if (NULL == peerstore)
942   {
943     LOG (GNUNET_ERROR_TYPE_ERROR,
944          _("Failed to connect to peerstore service.\n"));
945     SENSOR_reporting_stop ();
946     return GNUNET_SYSERR;
947   }
948   /* Connect to CORE */
949   core =
950       GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
951                            &core_disconnect_cb, NULL, GNUNET_YES, NULL,
952                            GNUNET_YES, core_handlers);
953   if (NULL == core)
954   {
955     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
956     SENSOR_reporting_stop ();
957     return GNUNET_SYSERR;
958   }
959   /* Connect to CADET */
960   cadet =
961       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
962                             cadet_handlers, NULL);
963   if (NULL == cadet)
964   {
965     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
966     SENSOR_reporting_stop ();
967     return GNUNET_SYSERR;
968   }
969   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
970   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
971   neighborhood = 0;
972   module_running = GNUNET_YES;
973   return GNUNET_OK;
974 }
975
976 /* end of gnunet-service-sensor_reporting.c */