104c669459c39fdae35f7e315743728a54079333
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36
37 /**
38  * When we are still generating a proof-of-work and we need to send an anomaly
39  * report, we queue them until the generation is complete
40  */
41 struct AnomalyReportingQueueItem
42 {
43
44   /**
45    * DLL
46    */
47   struct AnomalyReportingQueueItem *prev;
48
49   /**
50    * DLL
51    */
52   struct AnomalyReportingQueueItem *next;
53
54   /**
55    * Message queue belonging to the peer that is the destination of the report
56    */
57   struct GNUNET_MQ_Handle *dest_mq;
58
59   /**
60    * Report type
61    */
62   int type;
63
64 };
65
66 struct AnomalyInfo
67 {
68
69   /**
70    * DLL
71    */
72   struct AnomalyInfo *prev;
73
74   /**
75    * DLL
76    */
77   struct AnomalyInfo *next;
78
79   /**
80    * Sensor information
81    */
82   struct GNUNET_SENSOR_SensorInfo *sensor;
83
84   /**
85    * Current anomalous status of sensor
86    */
87   int anomalous;
88
89   /**
90    * List of peers that reported an anomaly for this sensor
91    */
92   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
93
94   /**
95    * Report block with proof-of-work and signature
96    */
97   struct GNUNET_SENSOR_crypto_pow_block *report_block;
98
99   /**
100    * Context of an operation creating pow and signature
101    */
102   struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
103
104   /**
105    * Head of the queue of pending report destinations
106    */
107   struct AnomalyReportingQueueItem *reporting_queue_head;
108
109   /**
110    * Head of the queue of pending report destinations
111    */
112   struct AnomalyReportingQueueItem *reporting_queue_tail;
113
114 };
115
116 struct ValueInfo
117 {
118
119   /**
120    * DLL
121    */
122   struct ValueInfo *prev;
123
124   /**
125    * DLL
126    */
127   struct ValueInfo *next;
128
129   /**
130    * Sensor information
131    */
132   struct GNUNET_SENSOR_SensorInfo *sensor;
133
134   /**
135    * Last value read from sensor
136    */
137   void *last_value;
138
139   /**
140    * Size of @e last_value
141    */
142   size_t last_value_size;
143
144   /**
145    * Timestamp of last value reading
146    */
147   struct GNUNET_TIME_Absolute last_value_timestamp;
148
149   /**
150    * Has the last value seen already been reported to collection point?
151    */
152   int last_value_reported;
153
154   /**
155    * Watcher of sensor values
156    */
157   struct GNUNET_PEERSTORE_WatchContext *wc;
158
159   /**
160    * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
161    */
162   GNUNET_SCHEDULER_TaskIdentifier reporting_task;
163
164 };
165
166 /**
167  * Information about a connected CORE peer.
168  * Note that we only know about a connected peer if it is running the same
169  * application (sensor anomaly reporting) as us.
170  */
171 struct CorePeer
172 {
173
174   /**
175    * DLL
176    */
177   struct CorePeer *prev;
178
179   /**
180    * DLL
181    */
182   struct CorePeer *next;
183
184   /**
185    * Peer identity of connected peer
186    */
187   struct GNUNET_PeerIdentity *peer_id;
188
189   /**
190    * Message queue for messages to be sent to this peer
191    */
192   struct GNUNET_MQ_Handle *mq;
193
194 };
195
196 /**
197  * Information about a connected CADET peer (collection point).
198  */
199 struct CadetPeer
200 {
201
202   /**
203    * DLL
204    */
205   struct CadetPeer *prev;
206
207   /**
208    * DLL
209    */
210   struct CadetPeer *next;
211
212   /**
213    * Peer Identity
214    */
215   struct GNUNET_PeerIdentity peer_id;
216
217   /**
218    * CADET channel handle
219    */
220   struct GNUNET_CADET_Channel *channel;
221
222   /**
223    * Message queue for messages to be sent to this peer
224    */
225   struct GNUNET_MQ_Handle *mq;
226
227   /**
228    * Are we currently destroying the channel and its context?
229    */
230   int destroying;
231
232 };
233
234
235 /**
236  * Our configuration.
237  */
238 static const struct GNUNET_CONFIGURATION_Handle *cfg;
239
240 /**
241  * Multihashmap of loaded sensors
242  */
243 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
244
245 /**
246  * Handle to peerstore service
247  */
248 static struct GNUNET_PEERSTORE_Handle *peerstore;
249
250 /**
251  * Handle to core service
252  */
253 static struct GNUNET_CORE_Handle *core;
254
255 /**
256  * Handle to CADET service
257  */
258 static struct GNUNET_CADET_Handle *cadet;
259
260 /**
261  * My peer id
262  */
263 static struct GNUNET_PeerIdentity mypeerid;
264
265 /**
266  * My private key
267  */
268 static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
269
270 /**
271  * Head of DLL of anomaly info structs
272  */
273 static struct AnomalyInfo *ai_head;
274
275 /**
276  * Tail of DLL of anomaly info structs
277  */
278 static struct AnomalyInfo *ai_tail;
279
280 /**
281  * Head of DLL of value info structs
282  */
283 static struct ValueInfo *vi_head;
284
285 /**
286  * Tail of DLL of value info structs
287  */
288 static struct ValueInfo *vi_tail;
289
290 /**
291  * Head of DLL of CORE peers
292  */
293 static struct CorePeer *corep_head;
294
295 /**
296  * Tail of DLL of CORE peers
297  */
298 static struct CorePeer *corep_tail;
299
300 /**
301  * Head of DLL of CADET peers
302  */
303 static struct CadetPeer *cadetp_head;
304
305 /**
306  * Tail of DLL of CADET peers
307  */
308 static struct CadetPeer *cadetp_tail;
309
310 /**
311  * Is the module started?
312  */
313 static int module_running = GNUNET_NO;
314
315 /**
316  * Number of known neighborhood peers
317  */
318 static int neighborhood;
319
320 /**
321  * Parameter that defines the complexity of the proof-of-work
322  */
323 static long long unsigned int pow_matching_bits;
324
325
326
327 /******************************************************************************/
328 /******************************      CLEANUP     ******************************/
329 /******************************************************************************/
330
331 /**
332  * Destroy anomaly info struct
333  *
334  * @param ai struct to destroy
335  */
336 static void
337 destroy_anomaly_info (struct AnomalyInfo *ai)
338 {
339   struct AnomalyReportingQueueItem *ar_item;
340
341   ar_item = ai->reporting_queue_head;
342   while (NULL != ar_item)
343   {
344     GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
345                                  ai->reporting_queue_tail, ar_item);
346     GNUNET_free (ar_item);
347     ar_item = ai->reporting_queue_head;
348   }
349   if (NULL != ai->report_creation_cx)
350   {
351     GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
352     ai->report_creation_cx = NULL;
353   }
354   if (NULL != ai->report_block)
355   {
356     GNUNET_free (ai->report_block);
357     ai->report_block = NULL;
358   }
359   if (NULL != ai->anomalous_neighbors)
360   {
361     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
362     ai->anomalous_neighbors = NULL;
363   }
364   GNUNET_free (ai);
365 }
366
367
368 /**
369  * Destroy value info struct
370  *
371  * @param vi struct to destroy
372  */
373 static void
374 destroy_value_info (struct ValueInfo *vi)
375 {
376   if (NULL != vi->wc)
377   {
378     GNUNET_PEERSTORE_watch_cancel (vi->wc);
379     vi->wc = NULL;
380   }
381   if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
382   {
383     GNUNET_SCHEDULER_cancel (vi->reporting_task);
384     vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
385   }
386   if (NULL != vi->last_value)
387   {
388     GNUNET_free (vi->last_value);
389     vi->last_value = NULL;
390   }
391   GNUNET_free (vi);
392 }
393
394
395 /**
396  * Destroy core peer struct
397  *
398  * @param corep struct to destroy
399  */
400 static void
401 destroy_core_peer (struct CorePeer *corep)
402 {
403   struct AnomalyInfo *ai;
404
405   if (NULL != corep->mq)
406   {
407     GNUNET_MQ_destroy (corep->mq);
408     corep->mq = NULL;
409   }
410   ai = ai_head;
411   while (NULL != ai)
412   {
413     GNUNET_assert (NULL != ai->anomalous_neighbors);
414     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
415                                               corep->peer_id);
416     ai = ai->next;
417   }
418   GNUNET_free (corep);
419 }
420
421
422 /**
423  * Destroy cadet peer struct
424  *
425  * @param cadetp struct to destroy
426  */
427 static void
428 destroy_cadet_peer (struct CadetPeer *cadetp)
429 {
430   cadetp->destroying = GNUNET_YES;
431   if (NULL != cadetp->mq)
432   {
433     GNUNET_MQ_destroy (cadetp->mq);
434     cadetp->mq = NULL;
435   }
436   if (NULL != cadetp->channel)
437   {
438     GNUNET_CADET_channel_destroy (cadetp->channel);
439     cadetp->channel = NULL;
440   }
441   GNUNET_free (cadetp);
442 }
443
444
445 /**
446  * Stop sensor reporting module
447  */
448 void
449 SENSOR_reporting_stop ()
450 {
451   struct ValueInfo *vi;
452   struct CorePeer *corep;
453   struct AnomalyInfo *ai;
454   struct CadetPeer *cadetp;
455
456   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
457   module_running = GNUNET_NO;
458   neighborhood = 0;
459   /* Destroy value info's */
460   vi = vi_head;
461   while (NULL != vi)
462   {
463     GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
464     destroy_value_info (vi);
465     vi = vi_head;
466   }
467   /* Destroy core peers */
468   corep = corep_head;
469   while (NULL != corep)
470   {
471     GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
472     destroy_core_peer (corep);
473     corep = corep_head;
474   }
475   /* Destroy anomaly info's */
476   ai = ai_head;
477   while (NULL != ai)
478   {
479     GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
480     destroy_anomaly_info (ai);
481     ai = ai_head;
482   }
483   /* Destroy cadet peers */
484   cadetp = cadetp_head;
485   while (NULL != cadetp)
486   {
487     GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
488     destroy_cadet_peer (cadetp);
489     cadetp = cadetp_head;
490   }
491   /* Disconnect from other services */
492   if (NULL != core)
493   {
494     GNUNET_CORE_disconnect (core);
495     core = NULL;
496   }
497   if (NULL != peerstore)
498   {
499     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
500     peerstore = NULL;
501   }
502   if (NULL != cadet)
503   {
504     GNUNET_CADET_disconnect (cadet);
505     cadet = NULL;
506   }
507 }
508
509
510 /******************************************************************************/
511 /******************************      HELPERS     ******************************/
512 /******************************************************************************/
513
514
515 /**
516  * Gets the anomaly info struct related to the given sensor
517  *
518  * @param sensor Sensor to search by
519  */
520 static struct AnomalyInfo *
521 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
522 {
523   struct AnomalyInfo *ai;
524
525   ai = ai_head;
526   while (NULL != ai)
527   {
528     if (ai->sensor == sensor)
529     {
530       return ai;
531     }
532     ai = ai->next;
533   }
534   return NULL;
535 }
536
537
538 /**
539  * Returns context of a connected CADET peer.
540  * Creates it first if didn't exist before.
541  *
542  * @param pid Peer Identity
543  * @return Context of connected CADET peer
544  */
545 static struct CadetPeer *
546 get_cadet_peer (struct GNUNET_PeerIdentity pid)
547 {
548   struct CadetPeer *cadetp;
549
550   cadetp = cadetp_head;
551   while (NULL != cadetp)
552   {
553     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
554       return cadetp;
555     cadetp = cadetp->next;
556   }
557   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
558        GNUNET_i2s (&pid));
559   /* Not found, create struct and channel */
560   cadetp = GNUNET_new (struct CadetPeer);
561   cadetp->peer_id = pid;
562   cadetp->channel =
563       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
564                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
565                                    GNUNET_CADET_OPTION_RELIABLE);
566   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
567   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
568   return cadetp;
569 }
570
571
572 /**
573  * This function is called only when we have a block ready and want to send it
574  * to the given peer (represented by its message queue)
575  *
576  * @param mq Message queue to put the message in
577  * @param ai Anomaly info to report
578  * @param type Message type
579  */
580 static void
581 do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
582                         int type)
583 {
584   struct GNUNET_MessageHeader *msg;
585   struct GNUNET_MQ_Envelope *ev;
586   size_t block_size;
587
588   GNUNET_assert (NULL != ai->report_block);
589   block_size =
590       sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
591       ai->report_block->msg_size;
592   ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
593   memcpy (&msg[1], ai->report_block, block_size);
594   GNUNET_MQ_send (mq, ev);
595 }
596
597
598 /**
599  * Check if we have signed and proof-of-work block ready.
600  * If yes, we send the report directly, if no, we enqueue the reporting until
601  * the block is ready.
602  *
603  * @param mq Message queue to put the message in
604  * @param ai Anomaly info to report
605  * @param p2p Is the report sent to a neighboring peer
606  */
607 static void
608 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
609                      int p2p)
610 {
611   struct AnomalyReportingQueueItem *ar_item;
612   int type;
613
614   type =
615       (GNUNET_YES ==
616        p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
617       GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
618   if (NULL == ai->report_block)
619   {
620     ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
621
622     ar_item->dest_mq = mq;
623     ar_item->type = type;
624     GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
625                                       ai->reporting_queue_tail, ar_item);
626   }
627   else
628   {
629     do_send_anomaly_report (mq, ai, type);
630   }
631 }
632
633
634 /**
635  * Callback when the crypto module finished created proof-of-work and signature
636  * for an anomaly report.
637  *
638  * @param cls Closure, a `struct AnomalyInfo *`
639  * @param block The resulting block, NULL on error
640  */
641 static void
642 report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
643 {
644   struct AnomalyInfo *ai = cls;
645   struct AnomalyReportingQueueItem *ar_item;
646
647   ai->report_creation_cx = NULL;
648   if (NULL != ai->report_block)
649   {
650     LOG (GNUNET_ERROR_TYPE_ERROR,
651          _("Double creation of proof-of-work, this should not happen.\n"));
652     return;
653   }
654   if (NULL == block)
655   {
656     LOG (GNUNET_ERROR_TYPE_ERROR,
657          _("Failed to create pow and signature block.\n"));
658     return;
659   }
660   LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
661   ai->report_block =
662       GNUNET_memdup (block,
663                      sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
664                      block->msg_size);
665   ar_item = ai->reporting_queue_head;
666   while (NULL != ar_item)
667   {
668     GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
669                                  ai->reporting_queue_tail, ar_item);
670     do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
671     GNUNET_free (ar_item);
672     ar_item = ai->reporting_queue_head;
673   }
674 }
675
676
677 /**
678  * When a change to the anomaly info of a sensor is done, this function should
679  * be called to create the message, its proof-of-work and signuature ready to
680  * be sent to other peers or collection point.
681  *
682  * @param ai Anomaly Info struct
683  */
684 static void
685 update_anomaly_report_pow_block (struct AnomalyInfo *ai)
686 {
687   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
688   struct GNUNET_TIME_Absolute timestamp;
689
690   LOG (GNUNET_ERROR_TYPE_DEBUG,
691        "Updating anomaly report POW block due to data change.\n");
692   if (NULL != ai->report_block)
693   {
694     GNUNET_free (ai->report_block);
695     ai->report_block = NULL;
696   }
697   if (NULL != ai->report_creation_cx)
698   {
699     /* If a creation is already running, cancel it because the data changed */
700     GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
701     ai->report_creation_cx = NULL;
702   }
703   arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
704
705   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
706                       &arm->sensorname_hash);
707   arm->sensorversion_major = htons (ai->sensor->version_major);
708   arm->sensorversion_minor = htons (ai->sensor->version_minor);
709   arm->anomalous = htons (ai->anomalous);
710   arm->anomalous_neighbors =
711       (0 ==
712        neighborhood) ? 0 : ((float) GNUNET_CONTAINER_multipeermap_size (ai->
713                                                                         anomalous_neighbors))
714       / neighborhood;
715   timestamp = GNUNET_TIME_absolute_get ();
716   ai->report_creation_cx =
717       GNUNET_SENSOR_crypto_pow_sign (arm,
718                                      sizeof (struct
719                                              GNUNET_SENSOR_AnomalyReportMessage),
720                                      &timestamp, &mypeerid.public_key,
721                                      private_key, pow_matching_bits,
722                                      &report_creation_cb, ai);
723   GNUNET_free (arm);
724 }
725
726
727 /**
728  * Create a sensor value message from a given value info struct inside a MQ
729  * envelope.
730  *
731  * @param vi Value info struct to use
732  * @return Envelope with message
733  */
734 static struct GNUNET_MQ_Envelope *
735 create_value_message (struct ValueInfo *vi)
736 {
737   struct GNUNET_SENSOR_ValueMessage *vm;
738   struct GNUNET_MQ_Envelope *ev;
739
740   ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
741                             GNUNET_MESSAGE_TYPE_SENSOR_READING);
742   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
743                       &vm->sensorname_hash);
744   vm->sensorversion_major = htons (vi->sensor->version_major);
745   vm->sensorversion_minor = htons (vi->sensor->version_minor);
746   vm->timestamp = vi->last_value_timestamp;
747   vm->value_size = htons (vi->last_value_size);
748   memcpy (&vm[1], vi->last_value, vi->last_value_size);
749   return ev;
750 }
751
752
753 /******************************************************************************/
754 /***************************      CORE Handlers     ***************************/
755 /******************************************************************************/
756
757
758 /**
759  * An inbound anomaly report is received from a peer through CORE.
760  *
761  * @param cls closure (unused)
762  * @param peer the other peer involved
763  * @param message the actual message
764  * @return #GNUNET_OK to keep the connection open,
765  *         #GNUNET_SYSERR to close connection to the peer (signal serious error)
766  */
767 static int
768 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
769                        const struct GNUNET_MessageHeader *message)
770 {
771   struct GNUNET_SENSOR_crypto_pow_block *report_block;
772   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
773   struct GNUNET_SENSOR_SensorInfo *sensor;
774   struct AnomalyInfo *my_anomaly_info;
775   struct CadetPeer *cadetp;
776   int peer_anomalous;
777   int peer_in_anomalous_list;
778
779   /* Verify proof-of-work, signature and extract report message */
780   report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
781   if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
782       GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
783                                             (struct GNUNET_CRYPTO_EddsaPublicKey
784                                              *) &other->public_key,
785                                             (void **) &arm))
786   {
787     LOG (GNUNET_ERROR_TYPE_WARNING,
788          "Received invalid anomaly report from peer `%s'.\n",
789          GNUNET_i2s (other));
790     GNUNET_break_op (0);
791     return GNUNET_SYSERR;
792   }
793   /* Now we parse the content of the message */
794   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
795   if (NULL == sensor ||
796       sensor->version_major != ntohs (arm->sensorversion_major) ||
797       sensor->version_minor != ntohs (arm->sensorversion_minor))
798   {
799     LOG (GNUNET_ERROR_TYPE_WARNING,
800          "I don't have the sensor reported by the peer `%s'.\n",
801          GNUNET_i2s (other));
802     return GNUNET_OK;
803   }
804   my_anomaly_info = get_anomaly_info_by_sensor (sensor);
805   GNUNET_assert (NULL != my_anomaly_info);
806   peer_in_anomalous_list =
807       GNUNET_CONTAINER_multipeermap_contains (my_anomaly_info->
808                                               anomalous_neighbors, other);
809   peer_anomalous = ntohs (arm->anomalous);
810   LOG (GNUNET_ERROR_TYPE_DEBUG,
811        "Received an anomaly update from neighbour `%s' (%d).\n",
812        GNUNET_i2s (other), peer_anomalous);
813   if (GNUNET_YES == peer_anomalous)
814   {
815     if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
816       GNUNET_break_op (0);
817     else
818       GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
819                                          other, NULL,
820                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
821   }
822   else
823   {
824     if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
825       GNUNET_break_op (0);
826     else
827       GNUNET_CONTAINER_multipeermap_remove_all (my_anomaly_info->
828                                                 anomalous_neighbors, other);
829   }
830   /* This is important to create an updated block since the data changed */
831   update_anomaly_report_pow_block (my_anomaly_info);
832   /* Send anomaly update to collection point only if I have the same anomaly */
833   if (GNUNET_YES == my_anomaly_info->anomalous &&
834       NULL != sensor->collection_point &&
835       GNUNET_YES == sensor->report_anomalies)
836   {
837     LOG (GNUNET_ERROR_TYPE_DEBUG,
838          "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
839          GNUNET_i2s (sensor->collection_point));
840     cadetp = get_cadet_peer (*sensor->collection_point);
841     send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
842   }
843   return GNUNET_OK;
844 }
845
846
847 /******************************************************************************/
848 /************************      PEERSTORE callbacks     ************************/
849 /******************************************************************************/
850
851
852 /**
853  * Sensor value watch callback
854  *
855  * @param cls Closure, ValueInfo struct related to the sensor we are watching
856  * @param record PEERSTORE new record, NULL if error
857  * @param emsg Error message, NULL if no error
858  * @return GNUNET_YES to continue watching
859  */
860 static int
861 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
862 {
863   struct ValueInfo *vi = cls;
864
865   if (NULL != emsg)
866   {
867     LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
868     return GNUNET_YES;
869   }
870   if (NULL != vi->last_value)
871   {
872     GNUNET_free (vi->last_value);
873     vi->last_value_size = 0;
874   }
875   vi->last_value = GNUNET_memdup (record->value, record->value_size);
876   vi->last_value_size = record->value_size;
877   vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
878   vi->last_value_reported = GNUNET_NO;
879   return GNUNET_YES;
880 }
881
882
883 /******************************************************************************/
884 /**************************      CORE callbacks     ***************************/
885 /******************************************************************************/
886
887
888 /**
889  * Method called whenever a CORE peer disconnects.
890  *
891  * @param cls closure (unused)
892  * @param peer peer identity this notification is about
893  */
894 static void
895 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
896 {
897   struct CorePeer *corep;
898
899   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
900     return;
901   LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
902        GNUNET_i2s (peer));
903   neighborhood--;
904   corep = corep_head;
905   while (NULL != corep)
906   {
907     if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
908     {
909       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
910       destroy_core_peer (corep);
911       return;
912     }
913     corep = corep->next;
914   }
915 }
916
917
918 /**
919  * Method called whenever a given peer connects through CORE.
920  *
921  * @param cls closure (unused)
922  * @param peer peer identity this notification is about
923  */
924 static void
925 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
926 {
927   struct CorePeer *corep;
928   struct AnomalyInfo *ai;
929
930   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
931     return;
932   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
933        GNUNET_i2s (peer));
934   neighborhood++;
935   corep = GNUNET_new (struct CorePeer);
936   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
937   corep->mq = GNUNET_CORE_mq_create (core, peer);
938   GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
939   /* Send any locally anomalous sensors to the new peer */
940   ai = ai_head;
941   while (NULL != ai)
942   {
943     if (GNUNET_YES == ai->anomalous)
944     {
945       LOG (GNUNET_ERROR_TYPE_DEBUG,
946            "Updating newly connected neighbor `%s' with anomalous sensor.\n",
947            GNUNET_i2s (peer));
948       send_anomaly_report (corep->mq, ai, GNUNET_YES);
949     }
950     ai = ai->next;
951   }
952 }
953
954
955 /**
956  * Function called after #GNUNET_CORE_connect has succeeded (or failed
957  * for good).  Note that the private key of the peer is intentionally
958  * not exposed here; if you need it, your process should try to read
959  * the private key file directly (which should work if you are
960  * authorized...).  Implementations of this function must not call
961  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
962  * do this later).
963  *
964  * @param cls closure (unused)
965  * @param my_identity ID of this peer, NULL if we failed
966  */
967 static void
968 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
969 {
970   if (NULL == my_identity)
971   {
972     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
973     SENSOR_reporting_stop ();
974     return;
975   }
976   if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
977   {
978     LOG (GNUNET_ERROR_TYPE_ERROR,
979          _("Peer identity received from CORE init doesn't match ours.\n"));
980     SENSOR_reporting_stop ();
981     return;
982   }
983 }
984
985
986 /******************************************************************************/
987 /*************************      CADET callbacks     ***************************/
988 /******************************************************************************/
989
990 /**
991  * Function called whenever a channel is destroyed.  Should clean up
992  * any associated state.
993  *
994  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
995  *
996  * @param cls closure (set from #GNUNET_CADET_connect)
997  * @param channel connection to the other end (henceforth invalid)
998  * @param channel_ctx place where local state associated
999  *                   with the channel is stored
1000  */
1001 static void
1002 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
1003                          void *channel_ctx)
1004 {
1005   struct CadetPeer *cadetp = channel_ctx;
1006
1007   if (GNUNET_YES == cadetp->destroying)
1008     return;
1009   LOG (GNUNET_ERROR_TYPE_DEBUG,
1010        "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1011        GNUNET_i2s (&cadetp->peer_id));
1012   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
1013   cadetp->channel = NULL;
1014   destroy_cadet_peer (cadetp);
1015 }
1016
1017
1018 /******************************************************************************/
1019 /**********************      Local anomaly receiver     ***********************/
1020 /******************************************************************************/
1021
1022
1023 /**
1024  * Used by the analysis module to tell the reporting module about a change in
1025  * the anomaly status of a sensor.
1026  *
1027  * @param sensor Related sensor
1028  * @param anomalous The new sensor anomalous status
1029  */
1030 void
1031 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
1032                                  int anomalous)
1033 {
1034   struct AnomalyInfo *ai;
1035   struct CorePeer *corep;
1036   struct CadetPeer *cadetp;
1037
1038   if (GNUNET_NO == module_running)
1039     return;
1040   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
1041   ai = get_anomaly_info_by_sensor (sensor);
1042   GNUNET_assert (NULL != ai);
1043   ai->anomalous = anomalous;
1044   /* This is important to create an updated block since the data changed */
1045   update_anomaly_report_pow_block (ai);
1046   /* Report change to all neighbors */
1047   corep = corep_head;
1048   while (NULL != corep)
1049   {
1050     LOG (GNUNET_ERROR_TYPE_DEBUG,
1051          "Sending an anomaly report to neighbor `%s'.\n",
1052          GNUNET_i2s (corep->peer_id));
1053     send_anomaly_report (corep->mq, ai, GNUNET_YES);
1054     corep = corep->next;
1055   }
1056   /* Report change to collection point if need */
1057   if (NULL != ai->sensor->collection_point &&
1058       GNUNET_YES == ai->sensor->report_anomalies)
1059   {
1060     LOG (GNUNET_ERROR_TYPE_DEBUG,
1061          "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
1062          GNUNET_i2s (ai->sensor->collection_point));
1063     cadetp = get_cadet_peer (*ai->sensor->collection_point);
1064     send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
1065   }
1066 }
1067
1068
1069 /******************************************************************************/
1070 /*******************      Reporting values (periodic)     *********************/
1071 /******************************************************************************/
1072
1073
1074 /**
1075  * Task scheduled to send values to collection point
1076  *
1077  * @param cls closure, a `struct ValueReportingContext *`
1078  * @param tc unused
1079  */
1080 static void
1081 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1082 {
1083   struct ValueInfo *vi = cls;
1084   struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
1085   struct CadetPeer *cadetp;
1086   struct GNUNET_MQ_Envelope *ev;
1087
1088   vi->reporting_task =
1089       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1090                                     &report_value, vi);
1091   if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
1092   {
1093     LOG (GNUNET_ERROR_TYPE_WARNING,
1094          "Did not receive a fresh value from `%s' to report.\n", sensor->name);
1095     return;
1096   }
1097   LOG (GNUNET_ERROR_TYPE_DEBUG,
1098        "Now trying to report last seen value of `%s' to collection point.\n",
1099        sensor->name);
1100   cadetp = get_cadet_peer (*sensor->collection_point);
1101   ev = create_value_message (vi);
1102   GNUNET_MQ_send (cadetp->mq, ev);
1103   vi->last_value_reported = GNUNET_YES;
1104 }
1105
1106
1107 /******************************************************************************/
1108 /********************************      INIT     *******************************/
1109 /******************************************************************************/
1110
1111
1112 /**
1113  * Iterator for defined sensors and creates anomaly info context
1114  *
1115  * @param cls unused
1116  * @param key unused
1117  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
1118  * @return #GNUNET_YES to continue iterations
1119  */
1120 static int
1121 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
1122                        void *value)
1123 {
1124   struct GNUNET_SENSOR_SensorInfo *sensor = value;
1125   struct AnomalyInfo *ai;
1126   struct ValueInfo *vi;
1127
1128   /* Create sensor anomaly info context */
1129   ai = GNUNET_new (struct AnomalyInfo);
1130
1131   ai->sensor = sensor;
1132   ai->anomalous = GNUNET_NO;
1133   ai->anomalous_neighbors =
1134       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1135   ai->report_block = NULL;
1136   ai->report_creation_cx = NULL;
1137   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
1138   /* Create sensor value info context (if needed to be reported) */
1139   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
1140     return GNUNET_YES;
1141   LOG (GNUNET_ERROR_TYPE_INFO,
1142        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
1143        sensor->name, GNUNET_i2s_full (sensor->collection_point),
1144        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
1145                                                GNUNET_YES));
1146   vi = GNUNET_new (struct ValueInfo);
1147   vi->sensor = sensor;
1148   vi->last_value = NULL;
1149   vi->last_value_size = 0;
1150   vi->last_value_reported = GNUNET_NO;
1151   vi->wc =
1152       GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
1153                               &value_watch_cb, vi);
1154   vi->reporting_task =
1155       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1156                                     &report_value, vi);
1157   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
1158   return GNUNET_YES;
1159 }
1160
1161
1162 /**
1163  * Start the sensor anomaly reporting module
1164  *
1165  * @param c our service configuration
1166  * @param s multihashmap of loaded sensors
1167  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
1168  */
1169 int
1170 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
1171                         struct GNUNET_CONTAINER_MultiHashMap *s)
1172 {
1173   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1174     {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
1175      sizeof (struct GNUNET_MessageHeader) +
1176      sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
1177      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
1178     {NULL, 0, 0}
1179   };
1180   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1181     {NULL, 0, 0}
1182   };
1183
1184   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
1185   GNUNET_assert (NULL != s);
1186   sensors = s;
1187   cfg = c;
1188   if (GNUNET_OK !=
1189       GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
1190                                              "POW_MATCHING_BITS",
1191                                              &pow_matching_bits))
1192   {
1193     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
1194                                "POW_MATCHING_BITS");
1195     SENSOR_reporting_stop ();
1196     return GNUNET_SYSERR;
1197   }
1198   if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
1199   {
1200     LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
1201          pow_matching_bits, sizeof (struct GNUNET_HashCode));
1202     SENSOR_reporting_stop ();
1203     return GNUNET_SYSERR;
1204   }
1205   /* Connect to PEERSTORE */
1206   peerstore = GNUNET_PEERSTORE_connect (cfg);
1207   if (NULL == peerstore)
1208   {
1209     LOG (GNUNET_ERROR_TYPE_ERROR,
1210          _("Failed to connect to peerstore service.\n"));
1211     SENSOR_reporting_stop ();
1212     return GNUNET_SYSERR;
1213   }
1214   /* Connect to CORE */
1215   core =
1216       GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
1217                            &core_disconnect_cb, NULL, GNUNET_YES, NULL,
1218                            GNUNET_YES, core_handlers);
1219   if (NULL == core)
1220   {
1221     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1222     SENSOR_reporting_stop ();
1223     return GNUNET_SYSERR;
1224   }
1225   /* Connect to CADET */
1226   cadet =
1227       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1228                             cadet_handlers, NULL);
1229   if (NULL == cadet)
1230   {
1231     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1232     SENSOR_reporting_stop ();
1233     return GNUNET_SYSERR;
1234   }
1235   private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1236   if (NULL == private_key)
1237   {
1238     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
1239     SENSOR_reporting_stop ();
1240     return GNUNET_SYSERR;
1241   }
1242   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1243   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1244   neighborhood = 0;
1245   module_running = GNUNET_YES;
1246   return GNUNET_OK;
1247 }
1248
1249 /* end of gnunet-service-sensor_reporting.c */