sensor: update to test case + fix
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36
37 struct AnomalyInfo
38 {
39
40   /**
41    * DLL
42    */
43   struct AnomalyInfo *prev;
44
45   /**
46    * DLL
47    */
48   struct AnomalyInfo *next;
49
50   /**
51    * Sensor information
52    */
53   struct GNUNET_SENSOR_SensorInfo *sensor;
54
55   /**
56    * Current anomalous status of sensor
57    */
58   int anomalous;
59
60   /**
61    * List of peers that reported an anomaly for this sensor
62    */
63   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
64
65 };
66
67 struct ValueInfo
68 {
69
70   /**
71    * DLL
72    */
73   struct ValueInfo *prev;
74
75   /**
76    * DLL
77    */
78   struct ValueInfo *next;
79
80   /**
81    * Sensor information
82    */
83   struct GNUNET_SENSOR_SensorInfo *sensor;
84
85   /**
86    * Last value read from sensor
87    */
88   void *last_value;
89
90   /**
91    * Size of @e last_value
92    */
93   size_t last_value_size;
94
95   /**
96    * Timestamp of last value reading
97    */
98   struct GNUNET_TIME_Absolute last_value_timestamp;
99
100   /**
101    * Has the last value seen already been reported to collection point?
102    */
103   int last_value_reported;
104
105   /**
106    * Watcher of sensor values
107    */
108   struct GNUNET_PEERSTORE_WatchContext *wc;
109
110   /**
111    * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
112    */
113   GNUNET_SCHEDULER_TaskIdentifier reporting_task;
114
115 };
116
117 /**
118  * Information about a connected CORE peer.
119  * Note that we only know about a connected peer if it is running the same
120  * application (sensor anomaly reporting) as us.
121  */
122 struct CorePeer
123 {
124
125   /**
126    * DLL
127    */
128   struct CorePeer *prev;
129
130   /**
131    * DLL
132    */
133   struct CorePeer *next;
134
135   /**
136    * Peer identity of connected peer
137    */
138   struct GNUNET_PeerIdentity *peer_id;
139
140   /**
141    * Message queue for messages to be sent to this peer
142    */
143   struct GNUNET_MQ_Handle *mq;
144
145 };
146
147 /**
148  * Information about a connected CADET peer (collection point).
149  */
150 struct CadetPeer
151 {
152
153   /**
154    * DLL
155    */
156   struct CadetPeer *prev;
157
158   /**
159    * DLL
160    */
161   struct CadetPeer *next;
162
163   /**
164    * Peer Identity
165    */
166   struct GNUNET_PeerIdentity peer_id;
167
168   /**
169    * CADET channel handle
170    */
171   struct GNUNET_CADET_Channel *channel;
172
173   /**
174    * Message queue for messages to be sent to this peer
175    */
176   struct GNUNET_MQ_Handle *mq;
177
178   /**
179    * Are we currently destroying the channel and its context?
180    */
181   int destroying;
182
183 };
184
185
186 /**
187  * Our configuration.
188  */
189 static const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191 /**
192  * Multihashmap of loaded sensors
193  */
194 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
195
196 /**
197  * Handle to peerstore service
198  */
199 static struct GNUNET_PEERSTORE_Handle *peerstore;
200
201 /**
202  * Handle to core service
203  */
204 static struct GNUNET_CORE_Handle *core;
205
206 /**
207  * Handle to CADET service
208  */
209 static struct GNUNET_CADET_Handle *cadet;
210
211 /**
212  * My peer id
213  */
214 static struct GNUNET_PeerIdentity mypeerid;
215
216 /**
217  * Head of DLL of anomaly info structs
218  */
219 static struct AnomalyInfo *ai_head;
220
221 /**
222  * Tail of DLL of anomaly info structs
223  */
224 static struct AnomalyInfo *ai_tail;
225
226 /**
227  * Head of DLL of value info structs
228  */
229 static struct ValueInfo *vi_head;
230
231 /**
232  * Tail of DLL of value info structs
233  */
234 static struct ValueInfo *vi_tail;
235
236 /**
237  * Head of DLL of CORE peers
238  */
239 static struct CorePeer *corep_head;
240
241 /**
242  * Tail of DLL of CORE peers
243  */
244 static struct CorePeer *corep_tail;
245
246 /**
247  * Head of DLL of CADET peers
248  */
249 static struct CadetPeer *cadetp_head;
250
251 /**
252  * Tail of DLL of CADET peers
253  */
254 static struct CadetPeer *cadetp_tail;
255
256 /**
257  * Is the module started?
258  */
259 static int module_running = GNUNET_NO;
260
261 /**
262  * Number of known neighborhood peers
263  */
264 static int neighborhood;
265
266
267
268 /******************************************************************************/
269 /******************************      CLEANUP     ******************************/
270 /******************************************************************************/
271
272 /**
273  * Destroy anomaly info struct
274  *
275  * @param ai struct to destroy
276  */
277 static void
278 destroy_anomaly_info (struct AnomalyInfo *ai)
279 {
280   if (NULL != ai->anomalous_neighbors)
281     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
282   GNUNET_free (ai);
283 }
284
285
286 /**
287  * Destroy value info struct
288  *
289  * @param vi struct to destroy
290  */
291 static void
292 destroy_value_info (struct ValueInfo *vi)
293 {
294   if (NULL != vi->wc)
295   {
296     GNUNET_PEERSTORE_watch_cancel (vi->wc);
297     vi->wc = NULL;
298   }
299   if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
300   {
301     GNUNET_SCHEDULER_cancel (vi->reporting_task);
302     vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
303   }
304   if (NULL != vi->last_value)
305   {
306     GNUNET_free (vi->last_value);
307     vi->last_value = NULL;
308   }
309   GNUNET_free (vi);
310 }
311
312
313 /**
314  * Destroy core peer struct
315  *
316  * @param corep struct to destroy
317  */
318 static void
319 destroy_core_peer (struct CorePeer *corep)
320 {
321   struct AnomalyInfo *ai;
322
323   if (NULL != corep->mq)
324   {
325     GNUNET_MQ_destroy (corep->mq);
326     corep->mq = NULL;
327   }
328   ai = ai_head;
329   while (NULL != ai)
330   {
331     GNUNET_assert (NULL != ai->anomalous_neighbors);
332     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
333                                               corep->peer_id);
334     ai = ai->next;
335   }
336   GNUNET_free (corep);
337 }
338
339
340 /**
341  * Destroy cadet peer struct
342  *
343  * @param cadetp struct to destroy
344  */
345 static void
346 destroy_cadet_peer (struct CadetPeer *cadetp)
347 {
348   cadetp->destroying = GNUNET_YES;
349   if (NULL != cadetp->mq)
350   {
351     GNUNET_MQ_destroy (cadetp->mq);
352     cadetp->mq = NULL;
353   }
354   if (NULL != cadetp->channel)
355   {
356     GNUNET_CADET_channel_destroy (cadetp->channel);
357     cadetp->channel = NULL;
358   }
359   GNUNET_free (cadetp);
360 }
361
362
363 /**
364  * Stop sensor reporting module
365  */
366 void
367 SENSOR_reporting_stop ()
368 {
369   struct ValueInfo *vi;
370   struct CorePeer *corep;
371   struct AnomalyInfo *ai;
372   struct CadetPeer *cadetp;
373
374   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
375   module_running = GNUNET_NO;
376   neighborhood = 0;
377   /* Destroy value info's */
378   vi = vi_head;
379   while (NULL != vi)
380   {
381     GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
382     destroy_value_info (vi);
383     vi = vi_head;
384   }
385   /* Destroy core peers */
386   corep = corep_head;
387   while (NULL != corep)
388   {
389     GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
390     destroy_core_peer (corep);
391     corep = corep_head;
392   }
393   /* Destroy anomaly info's */
394   ai = ai_head;
395   while (NULL != ai)
396   {
397     GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
398     destroy_anomaly_info (ai);
399     ai = ai_head;
400   }
401   /* Destroy cadet peers */
402   cadetp = cadetp_head;
403   while (NULL != cadetp)
404   {
405     GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
406     destroy_cadet_peer (cadetp);
407     cadetp = cadetp_head;
408   }
409   /* Disconnect from other services */
410   if (NULL != core)
411   {
412     GNUNET_CORE_disconnect (core);
413     core = NULL;
414   }
415   if (NULL != peerstore)
416   {
417     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
418     peerstore = NULL;
419   }
420   if (NULL != cadet)
421   {
422     GNUNET_CADET_disconnect (cadet);
423     cadet = NULL;
424   }
425 }
426
427
428 /******************************************************************************/
429 /******************************      HELPERS     ******************************/
430 /******************************************************************************/
431
432
433 /**
434  * Gets the anomaly info struct related to the given sensor
435  *
436  * @param sensor Sensor to search by
437  */
438 static struct AnomalyInfo *
439 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
440 {
441   struct AnomalyInfo *ai;
442
443   ai = ai_head;
444   while (NULL != ai)
445   {
446     if (ai->sensor == sensor)
447     {
448       return ai;
449     }
450     ai = ai->next;
451   }
452   return NULL;
453 }
454
455
456 /**
457  * Returns context of a connected CADET peer.
458  * Creates it first if didn't exist before.
459  *
460  * @param pid Peer Identity
461  * @return Context of connected CADET peer
462  */
463 static struct CadetPeer *
464 get_cadet_peer (struct GNUNET_PeerIdentity pid)
465 {
466   struct CadetPeer *cadetp;
467
468   cadetp = cadetp_head;
469   while (NULL != cadetp)
470   {
471     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
472       return cadetp;
473     cadetp = cadetp->next;
474   }
475   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
476        GNUNET_i2s (&pid));
477   /* Not found, create struct and channel */
478   cadetp = GNUNET_new (struct CadetPeer);
479   cadetp->peer_id = pid;
480   cadetp->channel =
481       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
482                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
483                                    GNUNET_CADET_OPTION_RELIABLE);
484   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
485   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
486   return cadetp;
487 }
488
489
490 /**
491  * Create an anomaly report message from a given anomaly info struct inside a
492  * MQ envelope.
493  *
494  * @param ai Anomaly info struct to use
495  * @param type Message type
496  * @return Envelope with message
497  */
498 static struct GNUNET_MQ_Envelope *
499 create_anomaly_report_message (struct AnomalyInfo *ai, int type)
500 {
501   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
502   struct GNUNET_MQ_Envelope *ev;
503
504   ev = GNUNET_MQ_msg (arm, type);
505   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
506                       &arm->sensorname_hash);
507   arm->sensorversion_major = htons (ai->sensor->version_major);
508   arm->sensorversion_minor = htons (ai->sensor->version_minor);
509   arm->anomalous = htons (ai->anomalous);
510   arm->anomalous_neighbors =
511       (0 ==
512        neighborhood) ? 0 : ((float)
513                             GNUNET_CONTAINER_multipeermap_size
514                             (ai->anomalous_neighbors)) / neighborhood;
515   return ev;
516 }
517
518
519 /**
520  * Create a sensor value message from a given value info struct inside a MQ
521  * envelope.
522  *
523  * @param vi Value info struct to use
524  * @return Envelope with message
525  */
526 static struct GNUNET_MQ_Envelope *
527 create_value_message (struct ValueInfo *vi)
528 {
529   struct GNUNET_SENSOR_ValueMessage *vm;
530   struct GNUNET_MQ_Envelope *ev;
531
532   ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
533                             GNUNET_MESSAGE_TYPE_SENSOR_READING);
534   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
535                       &vm->sensorname_hash);
536   vm->sensorversion_major = htons (vi->sensor->version_major);
537   vm->sensorversion_minor = htons (vi->sensor->version_minor);
538   vm->timestamp = vi->last_value_timestamp;
539   vm->value_size = htons (vi->last_value_size);
540   memcpy (&vm[1], vi->last_value, vi->last_value_size);
541   return ev;
542 }
543
544
545 /**
546  * Send given anomaly info report by putting it in the given message queue.
547  *
548  * @param mq Message queue to put the message in
549  * @param ai Anomaly info to report
550  * @param p2p Is the report sent to a neighboring peer
551  */
552 static void
553 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
554                      int p2p)
555 {
556   struct GNUNET_MQ_Envelope *ev;
557   int type;
558
559   type =
560       (GNUNET_YES ==
561        p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
562       GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
563   ev = create_anomaly_report_message (ai, type);
564   GNUNET_MQ_send (mq, ev);
565 }
566
567
568 /******************************************************************************/
569 /***************************      CORE Handlers     ***************************/
570 /******************************************************************************/
571
572
573 /**
574  * An inbound anomaly report is received from a peer through CORE.
575  *
576  * @param cls closure (unused)
577  * @param peer the other peer involved
578  * @param message the actual message
579  * @return #GNUNET_OK to keep the connection open,
580  *         #GNUNET_SYSERR to close connection to the peer (signal serious error)
581  */
582 static int
583 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
584                        const struct GNUNET_MessageHeader *message)
585 {
586   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
587   struct GNUNET_SENSOR_SensorInfo *sensor;
588   struct AnomalyInfo *my_anomaly_info;
589   struct CadetPeer *cadetp;
590   int peer_anomalous;
591   int peer_in_anomalous_list;
592
593   arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
594   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
595   if (NULL == sensor ||
596       sensor->version_major != ntohs (arm->sensorversion_major) ||
597       sensor->version_minor != ntohs (arm->sensorversion_minor))
598   {
599     LOG (GNUNET_ERROR_TYPE_WARNING,
600          "I don't have the sensor reported by the peer `%s'.\n",
601          GNUNET_i2s (other));
602     return GNUNET_OK;
603   }
604   my_anomaly_info = get_anomaly_info_by_sensor (sensor);
605   GNUNET_assert (NULL != my_anomaly_info);
606   peer_in_anomalous_list =
607       GNUNET_CONTAINER_multipeermap_contains
608       (my_anomaly_info->anomalous_neighbors, other);
609   peer_anomalous = ntohs (arm->anomalous);
610   if (GNUNET_YES == peer_anomalous)
611   {
612     if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
613       GNUNET_break_op (0);
614     else
615       GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
616                                          other, NULL,
617                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
618   }
619   else
620   {
621     if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
622       GNUNET_break_op (0);
623     else
624       GNUNET_CONTAINER_multipeermap_remove_all
625           (my_anomaly_info->anomalous_neighbors, other);
626   }
627   /* Send anomaly update to collection point only if I have the same anomaly */
628   if (GNUNET_YES == my_anomaly_info->anomalous &&
629       NULL != sensor->collection_point &&
630       GNUNET_YES == sensor->report_anomalies)
631   {
632     LOG (GNUNET_ERROR_TYPE_DEBUG,
633          "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
634          GNUNET_i2s (sensor->collection_point));
635     cadetp = get_cadet_peer (*sensor->collection_point);
636     send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
637   }
638   return GNUNET_OK;
639 }
640
641
642 /******************************************************************************/
643 /************************      PEERSTORE callbacks     ************************/
644 /******************************************************************************/
645
646
647 /**
648  * Sensor value watch callback
649  *
650  * @param cls Closure, ValueInfo struct related to the sensor we are watching
651  * @param record PEERSTORE new record, NULL if error
652  * @param emsg Error message, NULL if no error
653  * @return GNUNET_YES to continue watching
654  */
655 static int
656 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
657 {
658   struct ValueInfo *vi = cls;
659
660   if (NULL != emsg)
661   {
662     LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
663     return GNUNET_YES;
664   }
665   if (NULL != vi->last_value)
666   {
667     GNUNET_free (vi->last_value);
668     vi->last_value_size = 0;
669   }
670   vi->last_value = GNUNET_memdup (record->value, record->value_size);
671   vi->last_value_size = record->value_size;
672   vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
673   vi->last_value_reported = GNUNET_NO;
674   return GNUNET_YES;
675 }
676
677
678 /******************************************************************************/
679 /**************************      CORE callbacks     ***************************/
680 /******************************************************************************/
681
682
683 /**
684  * Method called whenever a CORE peer disconnects.
685  *
686  * @param cls closure (unused)
687  * @param peer peer identity this notification is about
688  */
689 static void
690 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
691 {
692   struct CorePeer *corep;
693
694   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
695     return;
696   LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
697        GNUNET_i2s (peer));
698   neighborhood--;
699   corep = corep_head;
700   while (NULL != corep)
701   {
702     if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
703     {
704       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
705       destroy_core_peer (corep);
706       return;
707     }
708     corep = corep->next;
709   }
710 }
711
712
713 /**
714  * Method called whenever a given peer connects through CORE.
715  *
716  * @param cls closure (unused)
717  * @param peer peer identity this notification is about
718  */
719 static void
720 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
721 {
722   struct CorePeer *corep;
723   struct AnomalyInfo *ai;
724
725   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
726     return;
727   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
728        GNUNET_i2s (peer));
729   neighborhood++;
730   corep = GNUNET_new (struct CorePeer);
731   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
732   corep->mq = GNUNET_CORE_mq_create (core, peer);
733   GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
734   /* Send any locally anomalous sensors to the new peer */
735   ai = ai_head;
736   while (NULL != ai)
737   {
738     if (GNUNET_YES == ai->anomalous)
739     {
740       LOG (GNUNET_ERROR_TYPE_DEBUG,
741            "Updating newly connected neighbor `%s' with anomalous sensor.\n",
742            GNUNET_i2s (peer));
743       send_anomaly_report (corep->mq, ai, GNUNET_YES);
744     }
745     ai = ai->next;
746   }
747 }
748
749
750 /**
751  * Function called after #GNUNET_CORE_connect has succeeded (or failed
752  * for good).  Note that the private key of the peer is intentionally
753  * not exposed here; if you need it, your process should try to read
754  * the private key file directly (which should work if you are
755  * authorized...).  Implementations of this function must not call
756  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
757  * do this later).
758  *
759  * @param cls closure (unused)
760  * @param my_identity ID of this peer, NULL if we failed
761  */
762 static void
763 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
764 {
765   if (NULL == my_identity)
766   {
767     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
768     SENSOR_reporting_stop ();
769     return;
770   }
771   if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
772   {
773     LOG (GNUNET_ERROR_TYPE_ERROR,
774          _("Peer identity received from CORE init doesn't match ours.\n"));
775     SENSOR_reporting_stop ();
776     return;
777   }
778 }
779
780
781 /******************************************************************************/
782 /*************************      CADET callbacks     ***************************/
783 /******************************************************************************/
784
785 /**
786  * Function called whenever a channel is destroyed.  Should clean up
787  * any associated state.
788  *
789  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
790  *
791  * @param cls closure (set from #GNUNET_CADET_connect)
792  * @param channel connection to the other end (henceforth invalid)
793  * @param channel_ctx place where local state associated
794  *                   with the channel is stored
795  */
796 static void
797 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
798                          void *channel_ctx)
799 {
800   struct CadetPeer *cadetp = channel_ctx;
801
802   if (GNUNET_YES == cadetp->destroying)
803     return;
804   LOG (GNUNET_ERROR_TYPE_DEBUG,
805        "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
806        GNUNET_i2s (&cadetp->peer_id));
807   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
808   cadetp->channel = NULL;
809   destroy_cadet_peer (cadetp);
810 }
811
812
813 /******************************************************************************/
814 /**********************      Local anomaly receiver     ***********************/
815 /******************************************************************************/
816
817
818 /**
819  * Used by the analysis module to tell the reporting module about a change in
820  * the anomaly status of a sensor.
821  *
822  * @param sensor Related sensor
823  * @param anomalous The new sensor anomalous status
824  */
825 void
826 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
827                                  int anomalous)
828 {
829   struct AnomalyInfo *ai;
830   struct CorePeer *corep;
831   struct CadetPeer *cadetp;
832
833   if (GNUNET_NO == module_running)
834     return;
835   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
836   ai = get_anomaly_info_by_sensor (sensor);
837   GNUNET_assert (NULL != ai);
838   ai->anomalous = anomalous;
839   /* Report change to all neighbors */
840   corep = corep_head;
841   while (NULL != corep)
842   {
843     LOG (GNUNET_ERROR_TYPE_DEBUG,
844          "Sending an anomaly report to neighbor `%s'.\n",
845          GNUNET_i2s (corep->peer_id));
846     send_anomaly_report (corep->mq, ai, GNUNET_YES);
847     corep = corep->next;
848   }
849   /* Report change to collection point if need */
850   if (NULL != ai->sensor->collection_point &&
851       GNUNET_YES == ai->sensor->report_anomalies)
852   {
853     LOG (GNUNET_ERROR_TYPE_DEBUG,
854          "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
855          GNUNET_i2s (ai->sensor->collection_point));
856     cadetp = get_cadet_peer (*ai->sensor->collection_point);
857     send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
858   }
859 }
860
861
862 /******************************************************************************/
863 /*******************      Reporting values (periodic)     *********************/
864 /******************************************************************************/
865
866
867 /**
868  * Task scheduled to send values to collection point
869  *
870  * @param cls closure, a `struct ValueReportingContext *`
871  * @param tc unused
872  */
873 static void
874 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
875 {
876   struct ValueInfo *vi = cls;
877   struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
878   struct CadetPeer *cadetp;
879   struct GNUNET_MQ_Envelope *ev;
880
881   vi->reporting_task =
882       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
883                                     &report_value, vi);
884   if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
885   {
886     LOG (GNUNET_ERROR_TYPE_WARNING,
887          "Did not receive a fresh value from `%s' to report.\n", sensor->name);
888     return;
889   }
890   LOG (GNUNET_ERROR_TYPE_DEBUG,
891        "Now trying to report last seen value of `%s' to collection point.\n",
892        sensor->name);
893   cadetp = get_cadet_peer (*sensor->collection_point);
894   ev = create_value_message (vi);
895   GNUNET_MQ_send (cadetp->mq, ev);
896   vi->last_value_reported = GNUNET_YES;
897 }
898
899
900 /******************************************************************************/
901 /********************************      INIT     *******************************/
902 /******************************************************************************/
903
904
905 /**
906  * Iterator for defined sensors and creates anomaly info context
907  *
908  * @param cls unused
909  * @param key unused
910  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
911  * @return #GNUNET_YES to continue iterations
912  */
913 static int
914 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
915                        void *value)
916 {
917   struct GNUNET_SENSOR_SensorInfo *sensor = value;
918   struct AnomalyInfo *ai;
919   struct ValueInfo *vi;
920
921   /* Create sensor anomaly info context */
922   ai = GNUNET_new (struct AnomalyInfo);
923
924   ai->sensor = sensor;
925   ai->anomalous = GNUNET_NO;
926   ai->anomalous_neighbors =
927       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
928   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
929   /* Create sensor value info context (if needed to be reported) */
930   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
931     return GNUNET_YES;
932   LOG (GNUNET_ERROR_TYPE_INFO,
933        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
934        sensor->name, GNUNET_i2s_full (sensor->collection_point),
935        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
936                                                GNUNET_YES));
937   vi = GNUNET_new (struct ValueInfo);
938   vi->sensor = sensor;
939   vi->last_value = NULL;
940   vi->last_value_size = 0;
941   vi->last_value_reported = GNUNET_NO;
942   vi->wc =
943       GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
944                               &value_watch_cb, vi);
945   vi->reporting_task =
946       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
947                                     &report_value, vi);
948   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
949   return GNUNET_YES;
950 }
951
952
953 /**
954  * Start the sensor anomaly reporting module
955  *
956  * @param c our service configuration
957  * @param s multihashmap of loaded sensors
958  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
959  */
960 int
961 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
962                         struct GNUNET_CONTAINER_MultiHashMap *s)
963 {
964   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
965     {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
966      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
967     {NULL, 0, 0}
968   };
969   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
970     {NULL, 0, 0}
971   };
972
973   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
974   GNUNET_assert (NULL != s);
975   sensors = s;
976   cfg = c;
977   /* Connect to PEERSTORE */
978   peerstore = GNUNET_PEERSTORE_connect (cfg);
979   if (NULL == peerstore)
980   {
981     LOG (GNUNET_ERROR_TYPE_ERROR,
982          _("Failed to connect to peerstore service.\n"));
983     SENSOR_reporting_stop ();
984     return GNUNET_SYSERR;
985   }
986   /* Connect to CORE */
987   core =
988       GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
989                            &core_disconnect_cb, NULL, GNUNET_YES, NULL,
990                            GNUNET_YES, core_handlers);
991   if (NULL == core)
992   {
993     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
994     SENSOR_reporting_stop ();
995     return GNUNET_SYSERR;
996   }
997   /* Connect to CADET */
998   cadet =
999       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1000                             cadet_handlers, NULL);
1001   if (NULL == cadet)
1002   {
1003     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1004     SENSOR_reporting_stop ();
1005     return GNUNET_SYSERR;
1006   }
1007   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1008   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1009   neighborhood = 0;
1010   module_running = GNUNET_YES;
1011   return GNUNET_OK;
1012 }
1013
1014 /* end of gnunet-service-sensor_reporting.c */