sensor: fixes for proof-of-work, test passes now
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36
37 struct AnomalyInfo
38 {
39
40   /**
41    * DLL
42    */
43   struct AnomalyInfo *prev;
44
45   /**
46    * DLL
47    */
48   struct AnomalyInfo *next;
49
50   /**
51    * Sensor information
52    */
53   struct GNUNET_SENSOR_SensorInfo *sensor;
54
55   /**
56    * Current anomalous status of sensor
57    */
58   int anomalous;
59
60   /**
61    * List of peers that reported an anomaly for this sensor
62    */
63   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
64
65 };
66
67 struct ValueInfo
68 {
69
70   /**
71    * DLL
72    */
73   struct ValueInfo *prev;
74
75   /**
76    * DLL
77    */
78   struct ValueInfo *next;
79
80   /**
81    * Sensor information
82    */
83   struct GNUNET_SENSOR_SensorInfo *sensor;
84
85   /**
86    * Last value read from sensor
87    */
88   void *last_value;
89
90   /**
91    * Size of @e last_value
92    */
93   size_t last_value_size;
94
95   /**
96    * Timestamp of last value reading
97    */
98   struct GNUNET_TIME_Absolute last_value_timestamp;
99
100   /**
101    * Has the last value seen already been reported to collection point?
102    */
103   int last_value_reported;
104
105   /**
106    * Watcher of sensor values
107    */
108   struct GNUNET_PEERSTORE_WatchContext *wc;
109
110   /**
111    * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
112    */
113   GNUNET_SCHEDULER_TaskIdentifier reporting_task;
114
115 };
116
117 /**
118  * Information about a connected CORE peer.
119  * Note that we only know about a connected peer if it is running the same
120  * application (sensor anomaly reporting) as us.
121  */
122 struct CorePeer
123 {
124
125   /**
126    * DLL
127    */
128   struct CorePeer *prev;
129
130   /**
131    * DLL
132    */
133   struct CorePeer *next;
134
135   /**
136    * Peer identity of connected peer
137    */
138   struct GNUNET_PeerIdentity *peer_id;
139
140   /**
141    * Message queue for messages to be sent to this peer
142    */
143   struct GNUNET_MQ_Handle *mq;
144
145 };
146
147 /**
148  * Information about a connected CADET peer (collection point).
149  */
150 struct CadetPeer
151 {
152
153   /**
154    * DLL
155    */
156   struct CadetPeer *prev;
157
158   /**
159    * DLL
160    */
161   struct CadetPeer *next;
162
163   /**
164    * Peer Identity
165    */
166   struct GNUNET_PeerIdentity peer_id;
167
168   /**
169    * CADET channel handle
170    */
171   struct GNUNET_CADET_Channel *channel;
172
173   /**
174    * Message queue for messages to be sent to this peer
175    */
176   struct GNUNET_MQ_Handle *mq;
177
178   /**
179    * Are we currently destroying the channel and its context?
180    */
181   int destroying;
182
183 };
184
185
186 /**
187  * Our configuration.
188  */
189 static const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191 /**
192  * Multihashmap of loaded sensors
193  */
194 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
195
196 /**
197  * Handle to peerstore service
198  */
199 static struct GNUNET_PEERSTORE_Handle *peerstore;
200
201 /**
202  * Handle to core service
203  */
204 static struct GNUNET_CORE_Handle *core;
205
206 /**
207  * Handle to CADET service
208  */
209 static struct GNUNET_CADET_Handle *cadet;
210
211 /**
212  * My peer id
213  */
214 static struct GNUNET_PeerIdentity mypeerid;
215
216 /**
217  * Head of DLL of anomaly info structs
218  */
219 static struct AnomalyInfo *ai_head;
220
221 /**
222  * Tail of DLL of anomaly info structs
223  */
224 static struct AnomalyInfo *ai_tail;
225
226 /**
227  * Head of DLL of value info structs
228  */
229 static struct ValueInfo *vi_head;
230
231 /**
232  * Tail of DLL of value info structs
233  */
234 static struct ValueInfo *vi_tail;
235
236 /**
237  * Head of DLL of CORE peers
238  */
239 static struct CorePeer *corep_head;
240
241 /**
242  * Tail of DLL of CORE peers
243  */
244 static struct CorePeer *corep_tail;
245
246 /**
247  * Head of DLL of CADET peers
248  */
249 static struct CadetPeer *cadetp_head;
250
251 /**
252  * Tail of DLL of CADET peers
253  */
254 static struct CadetPeer *cadetp_tail;
255
256 /**
257  * Is the module started?
258  */
259 static int module_running = GNUNET_NO;
260
261 /**
262  * Number of known neighborhood peers
263  */
264 static int neighborhood;
265
266
267
268 /******************************************************************************/
269 /******************************      CLEANUP     ******************************/
270 /******************************************************************************/
271
272 /**
273  * Destroy anomaly info struct
274  *
275  * @param ai struct to destroy
276  */
277 static void
278 destroy_anomaly_info (struct AnomalyInfo *ai)
279 {
280   if (NULL != ai->anomalous_neighbors)
281     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
282   GNUNET_free (ai);
283 }
284
285
286 /**
287  * Destroy value info struct
288  *
289  * @param vi struct to destroy
290  */
291 static void
292 destroy_value_info (struct ValueInfo *vi)
293 {
294   if (NULL != vi->wc)
295   {
296     GNUNET_PEERSTORE_watch_cancel (vi->wc);
297     vi->wc = NULL;
298   }
299   if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
300   {
301     GNUNET_SCHEDULER_cancel (vi->reporting_task);
302     vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
303   }
304   if (NULL != vi->last_value)
305   {
306     GNUNET_free (vi->last_value);
307     vi->last_value = NULL;
308   }
309   GNUNET_free (vi);
310 }
311
312
313 /**
314  * Destroy core peer struct
315  *
316  * @param corep struct to destroy
317  */
318 static void
319 destroy_core_peer (struct CorePeer *corep)
320 {
321   struct AnomalyInfo *ai;
322
323   if (NULL != corep->mq)
324   {
325     GNUNET_MQ_destroy (corep->mq);
326     corep->mq = NULL;
327   }
328   ai = ai_head;
329   while (NULL != ai)
330   {
331     GNUNET_assert (NULL != ai->anomalous_neighbors);
332     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
333                                               corep->peer_id);
334     ai = ai->next;
335   }
336   GNUNET_free (corep);
337 }
338
339
340 /**
341  * Destroy cadet peer struct
342  *
343  * @param cadetp struct to destroy
344  */
345 static void
346 destroy_cadet_peer (struct CadetPeer *cadetp)
347 {
348   cadetp->destroying = GNUNET_YES;
349   if (NULL != cadetp->mq)
350   {
351     GNUNET_MQ_destroy (cadetp->mq);
352     cadetp->mq = NULL;
353   }
354   if (NULL != cadetp->channel)
355   {
356     GNUNET_CADET_channel_destroy (cadetp->channel);
357     cadetp->channel = NULL;
358   }
359   GNUNET_free (cadetp);
360 }
361
362
363 /**
364  * Stop sensor reporting module
365  */
366 void
367 SENSOR_reporting_stop ()
368 {
369   struct ValueInfo *vi;
370   struct CorePeer *corep;
371   struct AnomalyInfo *ai;
372   struct CadetPeer *cadetp;
373
374   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
375   module_running = GNUNET_NO;
376   neighborhood = 0;
377   /* Destroy value info's */
378   vi = vi_head;
379   while (NULL != vi)
380   {
381     GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
382     destroy_value_info (vi);
383     vi = vi_head;
384   }
385   /* Destroy core peers */
386   corep = corep_head;
387   while (NULL != corep)
388   {
389     GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
390     destroy_core_peer (corep);
391     corep = corep_head;
392   }
393   /* Destroy anomaly info's */
394   ai = ai_head;
395   while (NULL != ai)
396   {
397     GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
398     destroy_anomaly_info (ai);
399     ai = ai_head;
400   }
401   /* Destroy cadet peers */
402   cadetp = cadetp_head;
403   while (NULL != cadetp)
404   {
405     GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
406     destroy_cadet_peer (cadetp);
407     cadetp = cadetp_head;
408   }
409   /* Disconnect from other services */
410   if (NULL != core)
411   {
412     GNUNET_CORE_disconnect (core);
413     core = NULL;
414   }
415   if (NULL != peerstore)
416   {
417     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
418     peerstore = NULL;
419   }
420   if (NULL != cadet)
421   {
422     GNUNET_CADET_disconnect (cadet);
423     cadet = NULL;
424   }
425 }
426
427
428 /******************************************************************************/
429 /******************************      HELPERS     ******************************/
430 /******************************************************************************/
431
432
433 /**
434  * Gets the anomaly info struct related to the given sensor
435  *
436  * @param sensor Sensor to search by
437  */
438 static struct AnomalyInfo *
439 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
440 {
441   struct AnomalyInfo *ai;
442
443   ai = ai_head;
444   while (NULL != ai)
445   {
446     if (ai->sensor == sensor)
447     {
448       return ai;
449     }
450     ai = ai->next;
451   }
452   return NULL;
453 }
454
455
456 /**
457  * Returns context of a connected CADET peer.
458  * Creates it first if didn't exist before.
459  *
460  * @param pid Peer Identity
461  * @return Context of connected CADET peer
462  */
463 static struct CadetPeer *
464 get_cadet_peer (struct GNUNET_PeerIdentity pid)
465 {
466   struct CadetPeer *cadetp;
467
468   cadetp = cadetp_head;
469   while (NULL != cadetp)
470   {
471     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
472       return cadetp;
473     cadetp = cadetp->next;
474   }
475   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
476        GNUNET_i2s (&pid));
477   /* Not found, create struct and channel */
478   cadetp = GNUNET_new (struct CadetPeer);
479   cadetp->peer_id = pid;
480   cadetp->channel =
481       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
482                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
483                                    GNUNET_CADET_OPTION_RELIABLE);
484   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
485   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
486   return cadetp;
487 }
488
489
490 /**
491  * Create an anomaly report message from a given anomaly info struct inside a
492  * MQ envelope.
493  *
494  * @param ai Anomaly info struct to use
495  * @param type Message type
496  * @return Envelope with message
497  */
498 static struct GNUNET_MQ_Envelope *
499 create_anomaly_report_message (struct AnomalyInfo *ai, int type)
500 {
501   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
502   struct GNUNET_MQ_Envelope *ev;
503
504   ev = GNUNET_MQ_msg (arm, type);
505   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
506                       &arm->sensorname_hash);
507   arm->sensorversion_major = htons (ai->sensor->version_major);
508   arm->sensorversion_minor = htons (ai->sensor->version_minor);
509   arm->anomalous = htons (ai->anomalous);
510   arm->anomalous_neighbors =
511       (0 ==
512        neighborhood) ? 0 : ((float)
513                             GNUNET_CONTAINER_multipeermap_size
514                             (ai->anomalous_neighbors)) / neighborhood;
515   return ev;
516 }
517
518
519 /**
520  * Create a sensor value message from a given value info struct inside a MQ
521  * envelope.
522  *
523  * @param vi Value info struct to use
524  * @return Envelope with message
525  */
526 static struct GNUNET_MQ_Envelope *
527 create_value_message (struct ValueInfo *vi)
528 {
529   struct GNUNET_SENSOR_ValueMessage *vm;
530   struct GNUNET_MQ_Envelope *ev;
531
532   ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
533                             GNUNET_MESSAGE_TYPE_SENSOR_READING);
534   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
535                       &vm->sensorname_hash);
536   vm->sensorversion_major = htons (vi->sensor->version_major);
537   vm->sensorversion_minor = htons (vi->sensor->version_minor);
538   vm->timestamp = vi->last_value_timestamp;
539   vm->value_size = htons (vi->last_value_size);
540   memcpy (&vm[1], vi->last_value, vi->last_value_size);
541   return ev;
542 }
543
544
545 /**
546  * Send given anomaly info report by putting it in the given message queue.
547  *
548  * @param mq Message queue to put the message in
549  * @param ai Anomaly info to report
550  * @param p2p Is the report sent to a neighboring peer
551  */
552 static void
553 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
554                      int p2p)
555 {
556   struct GNUNET_MQ_Envelope *ev;
557   int type;
558
559   type =
560       (GNUNET_YES ==
561        p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
562       GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
563   ev = create_anomaly_report_message (ai, type);
564   GNUNET_MQ_send (mq, ev);
565 }
566
567
568 /******************************************************************************/
569 /***************************      CORE Handlers     ***************************/
570 /******************************************************************************/
571
572
573 /**
574  * An inbound anomaly report is received from a peer through CORE.
575  *
576  * @param cls closure (unused)
577  * @param peer the other peer involved
578  * @param message the actual message
579  * @return #GNUNET_OK to keep the connection open,
580  *         #GNUNET_SYSERR to close connection to the peer (signal serious error)
581  */
582 static int
583 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
584                        const struct GNUNET_MessageHeader *message)
585 {
586   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
587   struct GNUNET_SENSOR_SensorInfo *sensor;
588   struct AnomalyInfo *my_anomaly_info;
589   struct CadetPeer *cadetp;
590   int peer_anomalous;
591   int peer_in_anomalous_list;
592
593   arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
594   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
595   if (NULL == sensor ||
596       sensor->version_major != ntohs (arm->sensorversion_major) ||
597       sensor->version_minor != ntohs (arm->sensorversion_minor))
598   {
599     LOG (GNUNET_ERROR_TYPE_WARNING,
600          "I don't have the sensor reported by the peer `%s'.\n",
601          GNUNET_i2s (other));
602     return GNUNET_OK;
603   }
604   my_anomaly_info = get_anomaly_info_by_sensor (sensor);
605   GNUNET_assert (NULL != my_anomaly_info);
606   peer_in_anomalous_list =
607       GNUNET_CONTAINER_multipeermap_contains
608       (my_anomaly_info->anomalous_neighbors, other);
609   peer_anomalous = ntohs (arm->anomalous);
610   LOG (GNUNET_ERROR_TYPE_DEBUG,
611        "Received an anomaly update from neighbour `%s' (%d).\n",
612        GNUNET_i2s (other), peer_anomalous);
613   if (GNUNET_YES == peer_anomalous)
614   {
615     if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
616       GNUNET_break_op (0);
617     else
618       GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
619                                          other, NULL,
620                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
621   }
622   else
623   {
624     if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
625       GNUNET_break_op (0);
626     else
627       GNUNET_CONTAINER_multipeermap_remove_all
628           (my_anomaly_info->anomalous_neighbors, other);
629   }
630   /* Send anomaly update to collection point only if I have the same anomaly */
631   if (GNUNET_YES == my_anomaly_info->anomalous &&
632       NULL != sensor->collection_point &&
633       GNUNET_YES == sensor->report_anomalies)
634   {
635     LOG (GNUNET_ERROR_TYPE_DEBUG,
636          "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
637          GNUNET_i2s (sensor->collection_point));
638     cadetp = get_cadet_peer (*sensor->collection_point);
639     send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
640   }
641   return GNUNET_OK;
642 }
643
644
645 /******************************************************************************/
646 /************************      PEERSTORE callbacks     ************************/
647 /******************************************************************************/
648
649
650 /**
651  * Sensor value watch callback
652  *
653  * @param cls Closure, ValueInfo struct related to the sensor we are watching
654  * @param record PEERSTORE new record, NULL if error
655  * @param emsg Error message, NULL if no error
656  * @return GNUNET_YES to continue watching
657  */
658 static int
659 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
660 {
661   struct ValueInfo *vi = cls;
662
663   if (NULL != emsg)
664   {
665     LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
666     return GNUNET_YES;
667   }
668   if (NULL != vi->last_value)
669   {
670     GNUNET_free (vi->last_value);
671     vi->last_value_size = 0;
672   }
673   vi->last_value = GNUNET_memdup (record->value, record->value_size);
674   vi->last_value_size = record->value_size;
675   vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
676   vi->last_value_reported = GNUNET_NO;
677   return GNUNET_YES;
678 }
679
680
681 /******************************************************************************/
682 /**************************      CORE callbacks     ***************************/
683 /******************************************************************************/
684
685
686 /**
687  * Method called whenever a CORE peer disconnects.
688  *
689  * @param cls closure (unused)
690  * @param peer peer identity this notification is about
691  */
692 static void
693 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
694 {
695   struct CorePeer *corep;
696
697   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
698     return;
699   LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
700        GNUNET_i2s (peer));
701   neighborhood--;
702   corep = corep_head;
703   while (NULL != corep)
704   {
705     if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
706     {
707       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
708       destroy_core_peer (corep);
709       return;
710     }
711     corep = corep->next;
712   }
713 }
714
715
716 /**
717  * Method called whenever a given peer connects through CORE.
718  *
719  * @param cls closure (unused)
720  * @param peer peer identity this notification is about
721  */
722 static void
723 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
724 {
725   struct CorePeer *corep;
726   struct AnomalyInfo *ai;
727
728   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
729     return;
730   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
731        GNUNET_i2s (peer));
732   neighborhood++;
733   corep = GNUNET_new (struct CorePeer);
734   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
735   corep->mq = GNUNET_CORE_mq_create (core, peer);
736   GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
737   /* Send any locally anomalous sensors to the new peer */
738   ai = ai_head;
739   while (NULL != ai)
740   {
741     if (GNUNET_YES == ai->anomalous)
742     {
743       LOG (GNUNET_ERROR_TYPE_DEBUG,
744            "Updating newly connected neighbor `%s' with anomalous sensor.\n",
745            GNUNET_i2s (peer));
746       send_anomaly_report (corep->mq, ai, GNUNET_YES);
747     }
748     ai = ai->next;
749   }
750 }
751
752
753 /**
754  * Function called after #GNUNET_CORE_connect has succeeded (or failed
755  * for good).  Note that the private key of the peer is intentionally
756  * not exposed here; if you need it, your process should try to read
757  * the private key file directly (which should work if you are
758  * authorized...).  Implementations of this function must not call
759  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
760  * do this later).
761  *
762  * @param cls closure (unused)
763  * @param my_identity ID of this peer, NULL if we failed
764  */
765 static void
766 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
767 {
768   if (NULL == my_identity)
769   {
770     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
771     SENSOR_reporting_stop ();
772     return;
773   }
774   if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
775   {
776     LOG (GNUNET_ERROR_TYPE_ERROR,
777          _("Peer identity received from CORE init doesn't match ours.\n"));
778     SENSOR_reporting_stop ();
779     return;
780   }
781 }
782
783
784 /******************************************************************************/
785 /*************************      CADET callbacks     ***************************/
786 /******************************************************************************/
787
788 /**
789  * Function called whenever a channel is destroyed.  Should clean up
790  * any associated state.
791  *
792  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
793  *
794  * @param cls closure (set from #GNUNET_CADET_connect)
795  * @param channel connection to the other end (henceforth invalid)
796  * @param channel_ctx place where local state associated
797  *                   with the channel is stored
798  */
799 static void
800 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
801                          void *channel_ctx)
802 {
803   struct CadetPeer *cadetp = channel_ctx;
804
805   if (GNUNET_YES == cadetp->destroying)
806     return;
807   LOG (GNUNET_ERROR_TYPE_DEBUG,
808        "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
809        GNUNET_i2s (&cadetp->peer_id));
810   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
811   cadetp->channel = NULL;
812   destroy_cadet_peer (cadetp);
813 }
814
815
816 /******************************************************************************/
817 /**********************      Local anomaly receiver     ***********************/
818 /******************************************************************************/
819
820
821 /**
822  * Used by the analysis module to tell the reporting module about a change in
823  * the anomaly status of a sensor.
824  *
825  * @param sensor Related sensor
826  * @param anomalous The new sensor anomalous status
827  */
828 void
829 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
830                                  int anomalous)
831 {
832   struct AnomalyInfo *ai;
833   struct CorePeer *corep;
834   struct CadetPeer *cadetp;
835
836   if (GNUNET_NO == module_running)
837     return;
838   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
839   ai = get_anomaly_info_by_sensor (sensor);
840   GNUNET_assert (NULL != ai);
841   ai->anomalous = anomalous;
842   /* Report change to all neighbors */
843   corep = corep_head;
844   while (NULL != corep)
845   {
846     LOG (GNUNET_ERROR_TYPE_DEBUG,
847          "Sending an anomaly report to neighbor `%s'.\n",
848          GNUNET_i2s (corep->peer_id));
849     send_anomaly_report (corep->mq, ai, GNUNET_YES);
850     corep = corep->next;
851   }
852   /* Report change to collection point if need */
853   if (NULL != ai->sensor->collection_point &&
854       GNUNET_YES == ai->sensor->report_anomalies)
855   {
856     LOG (GNUNET_ERROR_TYPE_DEBUG,
857          "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
858          GNUNET_i2s (ai->sensor->collection_point));
859     cadetp = get_cadet_peer (*ai->sensor->collection_point);
860     send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
861   }
862 }
863
864
865 /******************************************************************************/
866 /*******************      Reporting values (periodic)     *********************/
867 /******************************************************************************/
868
869
870 /**
871  * Task scheduled to send values to collection point
872  *
873  * @param cls closure, a `struct ValueReportingContext *`
874  * @param tc unused
875  */
876 static void
877 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
878 {
879   struct ValueInfo *vi = cls;
880   struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
881   struct CadetPeer *cadetp;
882   struct GNUNET_MQ_Envelope *ev;
883
884   vi->reporting_task =
885       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
886                                     &report_value, vi);
887   if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
888   {
889     LOG (GNUNET_ERROR_TYPE_WARNING,
890          "Did not receive a fresh value from `%s' to report.\n", sensor->name);
891     return;
892   }
893   LOG (GNUNET_ERROR_TYPE_DEBUG,
894        "Now trying to report last seen value of `%s' to collection point.\n",
895        sensor->name);
896   cadetp = get_cadet_peer (*sensor->collection_point);
897   ev = create_value_message (vi);
898   GNUNET_MQ_send (cadetp->mq, ev);
899   vi->last_value_reported = GNUNET_YES;
900 }
901
902
903 /******************************************************************************/
904 /********************************      INIT     *******************************/
905 /******************************************************************************/
906
907
908 /**
909  * Iterator for defined sensors and creates anomaly info context
910  *
911  * @param cls unused
912  * @param key unused
913  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
914  * @return #GNUNET_YES to continue iterations
915  */
916 static int
917 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
918                        void *value)
919 {
920   struct GNUNET_SENSOR_SensorInfo *sensor = value;
921   struct AnomalyInfo *ai;
922   struct ValueInfo *vi;
923
924   /* Create sensor anomaly info context */
925   ai = GNUNET_new (struct AnomalyInfo);
926
927   ai->sensor = sensor;
928   ai->anomalous = GNUNET_NO;
929   ai->anomalous_neighbors =
930       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
931   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
932   /* Create sensor value info context (if needed to be reported) */
933   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
934     return GNUNET_YES;
935   LOG (GNUNET_ERROR_TYPE_INFO,
936        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
937        sensor->name, GNUNET_i2s_full (sensor->collection_point),
938        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
939                                                GNUNET_YES));
940   vi = GNUNET_new (struct ValueInfo);
941   vi->sensor = sensor;
942   vi->last_value = NULL;
943   vi->last_value_size = 0;
944   vi->last_value_reported = GNUNET_NO;
945   vi->wc =
946       GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
947                               &value_watch_cb, vi);
948   vi->reporting_task =
949       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
950                                     &report_value, vi);
951   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
952   return GNUNET_YES;
953 }
954
955
956 /**
957  * Start the sensor anomaly reporting module
958  *
959  * @param c our service configuration
960  * @param s multihashmap of loaded sensors
961  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
962  */
963 int
964 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
965                         struct GNUNET_CONTAINER_MultiHashMap *s)
966 {
967   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
968     {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
969      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
970     {NULL, 0, 0}
971   };
972   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
973     {NULL, 0, 0}
974   };
975
976   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
977   GNUNET_assert (NULL != s);
978   sensors = s;
979   cfg = c;
980   /* Connect to PEERSTORE */
981   peerstore = GNUNET_PEERSTORE_connect (cfg);
982   if (NULL == peerstore)
983   {
984     LOG (GNUNET_ERROR_TYPE_ERROR,
985          _("Failed to connect to peerstore service.\n"));
986     SENSOR_reporting_stop ();
987     return GNUNET_SYSERR;
988   }
989   /* Connect to CORE */
990   core =
991       GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
992                            &core_disconnect_cb, NULL, GNUNET_YES, NULL,
993                            GNUNET_YES, core_handlers);
994   if (NULL == core)
995   {
996     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
997     SENSOR_reporting_stop ();
998     return GNUNET_SYSERR;
999   }
1000   /* Connect to CADET */
1001   cadet =
1002       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1003                             cadet_handlers, NULL);
1004   if (NULL == cadet)
1005   {
1006     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1007     SENSOR_reporting_stop ();
1008     return GNUNET_SYSERR;
1009   }
1010   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1011   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1012   neighborhood = 0;
1013   module_running = GNUNET_YES;
1014   return GNUNET_OK;
1015 }
1016
1017 /* end of gnunet-service-sensor_reporting.c */