If 0 max search, use r5n dht
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36
37 /**
38  * When we are still generating a proof-of-work and we need to send an anomaly
39  * report, we queue them until the generation is complete
40  */
41 struct AnomalyReportingQueueItem
42 {
43
44   /**
45    * DLL
46    */
47   struct AnomalyReportingQueueItem *prev;
48
49   /**
50    * DLL
51    */
52   struct AnomalyReportingQueueItem *next;
53
54   /**
55    * Message queue belonging to the peer that is the destination of the report
56    */
57   struct GNUNET_MQ_Handle *dest_mq;
58
59   /**
60    * Report type
61    */
62   int type;
63
64 };
65
66 struct AnomalyInfo
67 {
68
69   /**
70    * DLL
71    */
72   struct AnomalyInfo *prev;
73
74   /**
75    * DLL
76    */
77   struct AnomalyInfo *next;
78
79   /**
80    * Sensor information
81    */
82   struct GNUNET_SENSOR_SensorInfo *sensor;
83
84   /**
85    * Current anomalous status of sensor
86    */
87   int anomalous;
88
89   /**
90    * List of peers that reported an anomaly for this sensor
91    */
92   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
93
94   /**
95    * Report block with proof-of-work and signature
96    */
97   struct GNUNET_SENSOR_crypto_pow_block *report_block;
98
99   /**
100    * Context of an operation creating pow and signature
101    */
102   struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
103
104   /**
105    * Head of the queue of pending report destinations
106    */
107   struct AnomalyReportingQueueItem *reporting_queue_head;
108
109   /**
110    * Head of the queue of pending report destinations
111    */
112   struct AnomalyReportingQueueItem *reporting_queue_tail;
113
114 };
115
116 struct ValueInfo
117 {
118
119   /**
120    * DLL
121    */
122   struct ValueInfo *prev;
123
124   /**
125    * DLL
126    */
127   struct ValueInfo *next;
128
129   /**
130    * Sensor information
131    */
132   struct GNUNET_SENSOR_SensorInfo *sensor;
133
134   /**
135    * Last value read from sensor
136    */
137   void *last_value;
138
139   /**
140    * Size of @e last_value
141    */
142   size_t last_value_size;
143
144   /**
145    * Timestamp of last value reading
146    */
147   struct GNUNET_TIME_Absolute last_value_timestamp;
148
149   /**
150    * Has the last value seen already been reported to collection point?
151    */
152   int last_value_reported;
153
154   /**
155    * Watcher of sensor values
156    */
157   struct GNUNET_PEERSTORE_WatchContext *wc;
158
159   /**
160    * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
161    */
162   GNUNET_SCHEDULER_TaskIdentifier reporting_task;
163
164 };
165
166 /**
167  * Information about a connected CORE peer.
168  * Note that we only know about a connected peer if it is running the same
169  * application (sensor anomaly reporting) as us.
170  */
171 struct CorePeer
172 {
173
174   /**
175    * DLL
176    */
177   struct CorePeer *prev;
178
179   /**
180    * DLL
181    */
182   struct CorePeer *next;
183
184   /**
185    * Peer identity of connected peer
186    */
187   struct GNUNET_PeerIdentity *peer_id;
188
189   /**
190    * Message queue for messages to be sent to this peer
191    */
192   struct GNUNET_MQ_Handle *mq;
193
194 };
195
196 /**
197  * Information about a connected CADET peer (collection point).
198  */
199 struct CadetPeer
200 {
201
202   /**
203    * DLL
204    */
205   struct CadetPeer *prev;
206
207   /**
208    * DLL
209    */
210   struct CadetPeer *next;
211
212   /**
213    * Peer Identity
214    */
215   struct GNUNET_PeerIdentity peer_id;
216
217   /**
218    * CADET channel handle
219    */
220   struct GNUNET_CADET_Channel *channel;
221
222   /**
223    * Message queue for messages to be sent to this peer
224    */
225   struct GNUNET_MQ_Handle *mq;
226
227   /**
228    * Are we currently destroying the channel and its context?
229    */
230   int destroying;
231
232 };
233
234
235 /**
236  * Our configuration.
237  */
238 static const struct GNUNET_CONFIGURATION_Handle *cfg;
239
240 /**
241  * Multihashmap of loaded sensors
242  */
243 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
244
245 /**
246  * Handle to peerstore service
247  */
248 static struct GNUNET_PEERSTORE_Handle *peerstore;
249
250 /**
251  * Handle to core service
252  */
253 static struct GNUNET_CORE_Handle *core;
254
255 /**
256  * Handle to CADET service
257  */
258 static struct GNUNET_CADET_Handle *cadet;
259
260 /**
261  * My peer id
262  */
263 static struct GNUNET_PeerIdentity mypeerid;
264
265 /**
266  * My private key
267  */
268 static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
269
270 /**
271  * Head of DLL of anomaly info structs
272  */
273 static struct AnomalyInfo *ai_head;
274
275 /**
276  * Tail of DLL of anomaly info structs
277  */
278 static struct AnomalyInfo *ai_tail;
279
280 /**
281  * Head of DLL of value info structs
282  */
283 static struct ValueInfo *vi_head;
284
285 /**
286  * Tail of DLL of value info structs
287  */
288 static struct ValueInfo *vi_tail;
289
290 /**
291  * Head of DLL of CORE peers
292  */
293 static struct CorePeer *corep_head;
294
295 /**
296  * Tail of DLL of CORE peers
297  */
298 static struct CorePeer *corep_tail;
299
300 /**
301  * Head of DLL of CADET peers
302  */
303 static struct CadetPeer *cadetp_head;
304
305 /**
306  * Tail of DLL of CADET peers
307  */
308 static struct CadetPeer *cadetp_tail;
309
310 /**
311  * Is the module started?
312  */
313 static int module_running = GNUNET_NO;
314
315 /**
316  * Number of known neighborhood peers
317  */
318 static int neighborhood;
319
320 /**
321  * Parameter that defines the complexity of the proof-of-work
322  */
323 static long long unsigned int pow_matching_bits;
324
325
326
327 /******************************************************************************/
328 /******************************      CLEANUP     ******************************/
329 /******************************************************************************/
330
331 /**
332  * Destroy anomaly info struct
333  *
334  * @param ai struct to destroy
335  */
336 static void
337 destroy_anomaly_info (struct AnomalyInfo *ai)
338 {
339   struct AnomalyReportingQueueItem *ar_item;
340
341   ar_item = ai->reporting_queue_head;
342   while (NULL != ar_item)
343   {
344     GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
345                                  ai->reporting_queue_tail, ar_item);
346     GNUNET_free (ar_item);
347     ar_item = ai->reporting_queue_head;
348   }
349   if (NULL != ai->report_creation_cx)
350   {
351     GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
352     ai->report_creation_cx = NULL;
353   }
354   if (NULL != ai->report_block)
355   {
356     GNUNET_free (ai->report_block);
357     ai->report_block = NULL;
358   }
359   if (NULL != ai->anomalous_neighbors)
360   {
361     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
362     ai->anomalous_neighbors = NULL;
363   }
364   GNUNET_free (ai);
365 }
366
367
368 /**
369  * Destroy value info struct
370  *
371  * @param vi struct to destroy
372  */
373 static void
374 destroy_value_info (struct ValueInfo *vi)
375 {
376   if (NULL != vi->wc)
377   {
378     GNUNET_PEERSTORE_watch_cancel (vi->wc);
379     vi->wc = NULL;
380   }
381   if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
382   {
383     GNUNET_SCHEDULER_cancel (vi->reporting_task);
384     vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
385   }
386   if (NULL != vi->last_value)
387   {
388     GNUNET_free (vi->last_value);
389     vi->last_value = NULL;
390   }
391   GNUNET_free (vi);
392 }
393
394
395 /**
396  * Destroy core peer struct
397  *
398  * @param corep struct to destroy
399  */
400 static void
401 destroy_core_peer (struct CorePeer *corep)
402 {
403   struct AnomalyInfo *ai;
404
405   if (NULL != corep->mq)
406   {
407     GNUNET_MQ_destroy (corep->mq);
408     corep->mq = NULL;
409   }
410   ai = ai_head;
411   while (NULL != ai)
412   {
413     GNUNET_assert (NULL != ai->anomalous_neighbors);
414     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
415                                               corep->peer_id);
416     ai = ai->next;
417   }
418   GNUNET_free (corep);
419 }
420
421
422 /**
423  * Destroy cadet peer struct
424  *
425  * @param cadetp struct to destroy
426  */
427 static void
428 destroy_cadet_peer (struct CadetPeer *cadetp)
429 {
430   cadetp->destroying = GNUNET_YES;
431   if (NULL != cadetp->mq)
432   {
433     GNUNET_MQ_destroy (cadetp->mq);
434     cadetp->mq = NULL;
435   }
436   if (NULL != cadetp->channel)
437   {
438     GNUNET_CADET_channel_destroy (cadetp->channel);
439     cadetp->channel = NULL;
440   }
441   GNUNET_free (cadetp);
442 }
443
444
445 /**
446  * Stop sensor reporting module
447  */
448 void
449 SENSOR_reporting_stop ()
450 {
451   struct ValueInfo *vi;
452   struct CorePeer *corep;
453   struct AnomalyInfo *ai;
454   struct CadetPeer *cadetp;
455
456   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
457   module_running = GNUNET_NO;
458   neighborhood = 0;
459   /* Destroy value info's */
460   vi = vi_head;
461   while (NULL != vi)
462   {
463     GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
464     destroy_value_info (vi);
465     vi = vi_head;
466   }
467   /* Destroy core peers */
468   corep = corep_head;
469   while (NULL != corep)
470   {
471     GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
472     destroy_core_peer (corep);
473     corep = corep_head;
474   }
475   /* Destroy anomaly info's */
476   ai = ai_head;
477   while (NULL != ai)
478   {
479     GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
480     destroy_anomaly_info (ai);
481     ai = ai_head;
482   }
483   /* Destroy cadet peers */
484   cadetp = cadetp_head;
485   while (NULL != cadetp)
486   {
487     GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
488     destroy_cadet_peer (cadetp);
489     cadetp = cadetp_head;
490   }
491   /* Disconnect from other services */
492   if (NULL != core)
493   {
494     GNUNET_CORE_disconnect (core);
495     core = NULL;
496   }
497   if (NULL != peerstore)
498   {
499     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
500     peerstore = NULL;
501   }
502   if (NULL != cadet)
503   {
504     GNUNET_CADET_disconnect (cadet);
505     cadet = NULL;
506   }
507 }
508
509
510 /******************************************************************************/
511 /******************************      HELPERS     ******************************/
512 /******************************************************************************/
513
514
515 /**
516  * Gets the anomaly info struct related to the given sensor
517  *
518  * @param sensor Sensor to search by
519  */
520 static struct AnomalyInfo *
521 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
522 {
523   struct AnomalyInfo *ai;
524
525   ai = ai_head;
526   while (NULL != ai)
527   {
528     if (ai->sensor == sensor)
529     {
530       return ai;
531     }
532     ai = ai->next;
533   }
534   return NULL;
535 }
536
537
538 /**
539  * Returns context of a connected CADET peer.
540  * Creates it first if didn't exist before.
541  *
542  * @param pid Peer Identity
543  * @return Context of connected CADET peer
544  */
545 static struct CadetPeer *
546 get_cadet_peer (struct GNUNET_PeerIdentity pid)
547 {
548   struct CadetPeer *cadetp;
549
550   cadetp = cadetp_head;
551   while (NULL != cadetp)
552   {
553     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
554       return cadetp;
555     cadetp = cadetp->next;
556   }
557   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
558        GNUNET_i2s (&pid));
559   /* Not found, create struct and channel */
560   cadetp = GNUNET_new (struct CadetPeer);
561   cadetp->peer_id = pid;
562   cadetp->channel =
563       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
564                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
565                                    GNUNET_CADET_OPTION_RELIABLE);
566   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
567   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
568   return cadetp;
569 }
570
571
572 /**
573  * This function is called only when we have a block ready and want to send it
574  * to the given peer (represented by its message queue)
575  *
576  * @param mq Message queue to put the message in
577  * @param ai Anomaly info to report
578  * @param type Message type
579  */
580 static void
581 do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
582                         int type)
583 {
584   struct GNUNET_MessageHeader *msg;
585   struct GNUNET_MQ_Envelope *ev;
586   size_t block_size;
587
588   GNUNET_assert (NULL != ai->report_block);
589   block_size =
590       sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
591       ai->report_block->msg_size;
592   ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
593   memcpy (&msg[1], ai->report_block, block_size);
594   GNUNET_MQ_send (mq, ev);
595 }
596
597
598 /**
599  * Check if we have signed and proof-of-work block ready.
600  * If yes, we send the report directly, if no, we enqueue the reporting until
601  * the block is ready.
602  *
603  * @param mq Message queue to put the message in
604  * @param ai Anomaly info to report
605  * @param p2p Is the report sent to a neighboring peer
606  */
607 static void
608 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
609                      int p2p)
610 {
611   struct AnomalyReportingQueueItem *ar_item;
612   int type;
613
614   type =
615       (GNUNET_YES ==
616        p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
617       GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
618   if (NULL == ai->report_block)
619   {
620     ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
621
622     ar_item->dest_mq = mq;
623     ar_item->type = type;
624     GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
625                                       ai->reporting_queue_tail, ar_item);
626   }
627   else
628   {
629     do_send_anomaly_report (mq, ai, type);
630   }
631 }
632
633
634 /**
635  * Callback when the crypto module finished created proof-of-work and signature
636  * for an anomaly report.
637  *
638  * @param cls Closure, a `struct AnomalyInfo *`
639  * @param block The resulting block, NULL on error
640  */
641 static void
642 report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
643 {
644   struct AnomalyInfo *ai = cls;
645   struct AnomalyReportingQueueItem *ar_item;
646
647   ai->report_creation_cx = NULL;
648   if (NULL != ai->report_block)
649   {
650     LOG (GNUNET_ERROR_TYPE_ERROR,
651          _("Double creation of proof-of-work, this should not happen.\n"));
652     return;
653   }
654   if (NULL == block)
655   {
656     LOG (GNUNET_ERROR_TYPE_ERROR,
657          _("Failed to create pow and signature block.\n"));
658     return;
659   }
660   LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
661   ai->report_block = block;
662   ar_item = ai->reporting_queue_head;
663   while (NULL != ar_item)
664   {
665     GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
666                                  ai->reporting_queue_tail, ar_item);
667     do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
668     GNUNET_free (ar_item);
669     ar_item = ai->reporting_queue_head;
670   }
671 }
672
673
674 /**
675  * When a change to the anomaly info of a sensor is done, this function should
676  * be called to create the message, its proof-of-work and signuature ready to
677  * be sent to other peers or collection point.
678  *
679  * @param ai Anomaly Info struct
680  */
681 static void
682 update_anomaly_report_pow_block (struct AnomalyInfo *ai)
683 {
684   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
685   struct GNUNET_TIME_Absolute timestamp;
686
687   LOG (GNUNET_ERROR_TYPE_DEBUG,
688        "Updating anomaly report POW block due to data change.\n");
689   if (NULL != ai->report_block)
690   {
691     GNUNET_free (ai->report_block);
692     ai->report_block = NULL;
693   }
694   if (NULL != ai->report_creation_cx)
695   {
696     /* If a creation is already running, cancel it because the data changed */
697     GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
698     ai->report_creation_cx = NULL;
699   }
700   arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
701
702   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
703                       &arm->sensorname_hash);
704   arm->sensorversion_major = htons (ai->sensor->version_major);
705   arm->sensorversion_minor = htons (ai->sensor->version_minor);
706   arm->anomalous = htons (ai->anomalous);
707   arm->anomalous_neighbors =
708       (0 ==
709        neighborhood) ? 0 : ((float)
710                             GNUNET_CONTAINER_multipeermap_size
711                             (ai->anomalous_neighbors)) / neighborhood;
712   timestamp = GNUNET_TIME_absolute_get ();
713   ai->report_creation_cx =
714       GNUNET_SENSOR_crypto_pow_sign (arm,
715                                      sizeof (struct
716                                              GNUNET_SENSOR_AnomalyReportMessage),
717                                      &timestamp, &mypeerid.public_key,
718                                      private_key, pow_matching_bits,
719                                      &report_creation_cb, ai);
720 }
721
722
723 /**
724  * Create a sensor value message from a given value info struct inside a MQ
725  * envelope.
726  *
727  * @param vi Value info struct to use
728  * @return Envelope with message
729  */
730 static struct GNUNET_MQ_Envelope *
731 create_value_message (struct ValueInfo *vi)
732 {
733   struct GNUNET_SENSOR_ValueMessage *vm;
734   struct GNUNET_MQ_Envelope *ev;
735
736   ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
737                             GNUNET_MESSAGE_TYPE_SENSOR_READING);
738   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
739                       &vm->sensorname_hash);
740   vm->sensorversion_major = htons (vi->sensor->version_major);
741   vm->sensorversion_minor = htons (vi->sensor->version_minor);
742   vm->timestamp = vi->last_value_timestamp;
743   vm->value_size = htons (vi->last_value_size);
744   memcpy (&vm[1], vi->last_value, vi->last_value_size);
745   return ev;
746 }
747
748
749 /******************************************************************************/
750 /***************************      CORE Handlers     ***************************/
751 /******************************************************************************/
752
753
754 /**
755  * An inbound anomaly report is received from a peer through CORE.
756  *
757  * @param cls closure (unused)
758  * @param peer the other peer involved
759  * @param message the actual message
760  * @return #GNUNET_OK to keep the connection open,
761  *         #GNUNET_SYSERR to close connection to the peer (signal serious error)
762  */
763 static int
764 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
765                        const struct GNUNET_MessageHeader *message)
766 {
767   struct GNUNET_SENSOR_crypto_pow_block *report_block;
768   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
769   struct GNUNET_SENSOR_SensorInfo *sensor;
770   struct AnomalyInfo *my_anomaly_info;
771   struct CadetPeer *cadetp;
772   int peer_anomalous;
773   int peer_in_anomalous_list;
774
775   /* Verify proof-of-work, signature and extract report message */
776   report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
777   if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
778       GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
779                                             (struct GNUNET_CRYPTO_EddsaPublicKey
780                                              *) &other->public_key,
781                                             (void **) &arm))
782   {
783     LOG (GNUNET_ERROR_TYPE_WARNING,
784          "Received invalid anomaly report from peer `%s'.\n",
785          GNUNET_i2s (other));
786     GNUNET_break_op (0);
787     return GNUNET_SYSERR;
788   }
789   /* Now we parse the content of the message */
790   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
791   if (NULL == sensor ||
792       sensor->version_major != ntohs (arm->sensorversion_major) ||
793       sensor->version_minor != ntohs (arm->sensorversion_minor))
794   {
795     LOG (GNUNET_ERROR_TYPE_WARNING,
796          "I don't have the sensor reported by the peer `%s'.\n",
797          GNUNET_i2s (other));
798     return GNUNET_OK;
799   }
800   my_anomaly_info = get_anomaly_info_by_sensor (sensor);
801   GNUNET_assert (NULL != my_anomaly_info);
802   peer_in_anomalous_list =
803       GNUNET_CONTAINER_multipeermap_contains
804       (my_anomaly_info->anomalous_neighbors, other);
805   peer_anomalous = ntohs (arm->anomalous);
806   LOG (GNUNET_ERROR_TYPE_DEBUG,
807        "Received an anomaly update from neighbour `%s' (%d).\n",
808        GNUNET_i2s (other), peer_anomalous);
809   if (GNUNET_YES == peer_anomalous)
810   {
811     if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
812       GNUNET_break_op (0);
813     else
814       GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
815                                          other, NULL,
816                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
817   }
818   else
819   {
820     if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
821       GNUNET_break_op (0);
822     else
823       GNUNET_CONTAINER_multipeermap_remove_all
824           (my_anomaly_info->anomalous_neighbors, other);
825   }
826   /* This is important to create an updated block since the data changed */
827   update_anomaly_report_pow_block (my_anomaly_info);
828   /* Send anomaly update to collection point only if I have the same anomaly */
829   if (GNUNET_YES == my_anomaly_info->anomalous &&
830       NULL != sensor->collection_point &&
831       GNUNET_YES == sensor->report_anomalies)
832   {
833     LOG (GNUNET_ERROR_TYPE_DEBUG,
834          "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
835          GNUNET_i2s (sensor->collection_point));
836     cadetp = get_cadet_peer (*sensor->collection_point);
837     send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
838   }
839   return GNUNET_OK;
840 }
841
842
843 /******************************************************************************/
844 /************************      PEERSTORE callbacks     ************************/
845 /******************************************************************************/
846
847
848 /**
849  * Sensor value watch callback
850  *
851  * @param cls Closure, ValueInfo struct related to the sensor we are watching
852  * @param record PEERSTORE new record, NULL if error
853  * @param emsg Error message, NULL if no error
854  * @return GNUNET_YES to continue watching
855  */
856 static int
857 value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
858 {
859   struct ValueInfo *vi = cls;
860
861   if (NULL != emsg)
862   {
863     LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
864     return GNUNET_YES;
865   }
866   if (NULL != vi->last_value)
867   {
868     GNUNET_free (vi->last_value);
869     vi->last_value_size = 0;
870   }
871   vi->last_value = GNUNET_memdup (record->value, record->value_size);
872   vi->last_value_size = record->value_size;
873   vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
874   vi->last_value_reported = GNUNET_NO;
875   return GNUNET_YES;
876 }
877
878
879 /******************************************************************************/
880 /**************************      CORE callbacks     ***************************/
881 /******************************************************************************/
882
883
884 /**
885  * Method called whenever a CORE peer disconnects.
886  *
887  * @param cls closure (unused)
888  * @param peer peer identity this notification is about
889  */
890 static void
891 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
892 {
893   struct CorePeer *corep;
894
895   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
896     return;
897   LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
898        GNUNET_i2s (peer));
899   neighborhood--;
900   corep = corep_head;
901   while (NULL != corep)
902   {
903     if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
904     {
905       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
906       destroy_core_peer (corep);
907       return;
908     }
909     corep = corep->next;
910   }
911 }
912
913
914 /**
915  * Method called whenever a given peer connects through CORE.
916  *
917  * @param cls closure (unused)
918  * @param peer peer identity this notification is about
919  */
920 static void
921 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
922 {
923   struct CorePeer *corep;
924   struct AnomalyInfo *ai;
925
926   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
927     return;
928   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
929        GNUNET_i2s (peer));
930   neighborhood++;
931   corep = GNUNET_new (struct CorePeer);
932   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
933   corep->mq = GNUNET_CORE_mq_create (core, peer);
934   GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
935   /* Send any locally anomalous sensors to the new peer */
936   ai = ai_head;
937   while (NULL != ai)
938   {
939     if (GNUNET_YES == ai->anomalous)
940     {
941       LOG (GNUNET_ERROR_TYPE_DEBUG,
942            "Updating newly connected neighbor `%s' with anomalous sensor.\n",
943            GNUNET_i2s (peer));
944       send_anomaly_report (corep->mq, ai, GNUNET_YES);
945     }
946     ai = ai->next;
947   }
948 }
949
950
951 /**
952  * Function called after #GNUNET_CORE_connect has succeeded (or failed
953  * for good).  Note that the private key of the peer is intentionally
954  * not exposed here; if you need it, your process should try to read
955  * the private key file directly (which should work if you are
956  * authorized...).  Implementations of this function must not call
957  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
958  * do this later).
959  *
960  * @param cls closure (unused)
961  * @param my_identity ID of this peer, NULL if we failed
962  */
963 static void
964 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
965 {
966   if (NULL == my_identity)
967   {
968     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
969     SENSOR_reporting_stop ();
970     return;
971   }
972   if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
973   {
974     LOG (GNUNET_ERROR_TYPE_ERROR,
975          _("Peer identity received from CORE init doesn't match ours.\n"));
976     SENSOR_reporting_stop ();
977     return;
978   }
979 }
980
981
982 /******************************************************************************/
983 /*************************      CADET callbacks     ***************************/
984 /******************************************************************************/
985
986 /**
987  * Function called whenever a channel is destroyed.  Should clean up
988  * any associated state.
989  *
990  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
991  *
992  * @param cls closure (set from #GNUNET_CADET_connect)
993  * @param channel connection to the other end (henceforth invalid)
994  * @param channel_ctx place where local state associated
995  *                   with the channel is stored
996  */
997 static void
998 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
999                          void *channel_ctx)
1000 {
1001   struct CadetPeer *cadetp = channel_ctx;
1002
1003   if (GNUNET_YES == cadetp->destroying)
1004     return;
1005   LOG (GNUNET_ERROR_TYPE_DEBUG,
1006        "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1007        GNUNET_i2s (&cadetp->peer_id));
1008   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
1009   cadetp->channel = NULL;
1010   destroy_cadet_peer (cadetp);
1011 }
1012
1013
1014 /******************************************************************************/
1015 /**********************      Local anomaly receiver     ***********************/
1016 /******************************************************************************/
1017
1018
1019 /**
1020  * Used by the analysis module to tell the reporting module about a change in
1021  * the anomaly status of a sensor.
1022  *
1023  * @param sensor Related sensor
1024  * @param anomalous The new sensor anomalous status
1025  */
1026 void
1027 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
1028                                  int anomalous)
1029 {
1030   struct AnomalyInfo *ai;
1031   struct CorePeer *corep;
1032   struct CadetPeer *cadetp;
1033
1034   if (GNUNET_NO == module_running)
1035     return;
1036   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
1037   ai = get_anomaly_info_by_sensor (sensor);
1038   GNUNET_assert (NULL != ai);
1039   ai->anomalous = anomalous;
1040   /* This is important to create an updated block since the data changed */
1041   update_anomaly_report_pow_block (ai);
1042   /* Report change to all neighbors */
1043   corep = corep_head;
1044   while (NULL != corep)
1045   {
1046     LOG (GNUNET_ERROR_TYPE_DEBUG,
1047          "Sending an anomaly report to neighbor `%s'.\n",
1048          GNUNET_i2s (corep->peer_id));
1049     send_anomaly_report (corep->mq, ai, GNUNET_YES);
1050     corep = corep->next;
1051   }
1052   /* Report change to collection point if need */
1053   if (NULL != ai->sensor->collection_point &&
1054       GNUNET_YES == ai->sensor->report_anomalies)
1055   {
1056     LOG (GNUNET_ERROR_TYPE_DEBUG,
1057          "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
1058          GNUNET_i2s (ai->sensor->collection_point));
1059     cadetp = get_cadet_peer (*ai->sensor->collection_point);
1060     send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
1061   }
1062 }
1063
1064
1065 /******************************************************************************/
1066 /*******************      Reporting values (periodic)     *********************/
1067 /******************************************************************************/
1068
1069
1070 /**
1071  * Task scheduled to send values to collection point
1072  *
1073  * @param cls closure, a `struct ValueReportingContext *`
1074  * @param tc unused
1075  */
1076 static void
1077 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1078 {
1079   struct ValueInfo *vi = cls;
1080   struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
1081   struct CadetPeer *cadetp;
1082   struct GNUNET_MQ_Envelope *ev;
1083
1084   vi->reporting_task =
1085       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1086                                     &report_value, vi);
1087   if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
1088   {
1089     LOG (GNUNET_ERROR_TYPE_WARNING,
1090          "Did not receive a fresh value from `%s' to report.\n", sensor->name);
1091     return;
1092   }
1093   LOG (GNUNET_ERROR_TYPE_DEBUG,
1094        "Now trying to report last seen value of `%s' to collection point.\n",
1095        sensor->name);
1096   cadetp = get_cadet_peer (*sensor->collection_point);
1097   ev = create_value_message (vi);
1098   GNUNET_MQ_send (cadetp->mq, ev);
1099   vi->last_value_reported = GNUNET_YES;
1100 }
1101
1102
1103 /******************************************************************************/
1104 /********************************      INIT     *******************************/
1105 /******************************************************************************/
1106
1107
1108 /**
1109  * Iterator for defined sensors and creates anomaly info context
1110  *
1111  * @param cls unused
1112  * @param key unused
1113  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
1114  * @return #GNUNET_YES to continue iterations
1115  */
1116 static int
1117 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
1118                        void *value)
1119 {
1120   struct GNUNET_SENSOR_SensorInfo *sensor = value;
1121   struct AnomalyInfo *ai;
1122   struct ValueInfo *vi;
1123
1124   /* Create sensor anomaly info context */
1125   ai = GNUNET_new (struct AnomalyInfo);
1126
1127   ai->sensor = sensor;
1128   ai->anomalous = GNUNET_NO;
1129   ai->anomalous_neighbors =
1130       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1131   ai->report_block = NULL;
1132   ai->report_creation_cx = NULL;
1133   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
1134   /* Create sensor value info context (if needed to be reported) */
1135   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
1136     return GNUNET_YES;
1137   LOG (GNUNET_ERROR_TYPE_INFO,
1138        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
1139        sensor->name, GNUNET_i2s_full (sensor->collection_point),
1140        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
1141                                                GNUNET_YES));
1142   vi = GNUNET_new (struct ValueInfo);
1143   vi->sensor = sensor;
1144   vi->last_value = NULL;
1145   vi->last_value_size = 0;
1146   vi->last_value_reported = GNUNET_NO;
1147   vi->wc =
1148       GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
1149                               &value_watch_cb, vi);
1150   vi->reporting_task =
1151       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1152                                     &report_value, vi);
1153   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
1154   return GNUNET_YES;
1155 }
1156
1157
1158 /**
1159  * Start the sensor anomaly reporting module
1160  *
1161  * @param c our service configuration
1162  * @param s multihashmap of loaded sensors
1163  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
1164  */
1165 int
1166 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
1167                         struct GNUNET_CONTAINER_MultiHashMap *s)
1168 {
1169   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1170     {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
1171      sizeof (struct GNUNET_MessageHeader) +
1172      sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
1173      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
1174     {NULL, 0, 0}
1175   };
1176   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1177     {NULL, 0, 0}
1178   };
1179
1180   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
1181   GNUNET_assert (NULL != s);
1182   sensors = s;
1183   cfg = c;
1184   if (GNUNET_OK !=
1185       GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
1186                                              "POW_MATCHING_BITS",
1187                                              &pow_matching_bits))
1188   {
1189     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
1190                                "POW_MATCHING_BITS");
1191     SENSOR_reporting_stop ();
1192     return GNUNET_SYSERR;
1193   }
1194   if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
1195   {
1196     LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
1197          pow_matching_bits, sizeof (struct GNUNET_HashCode));
1198     SENSOR_reporting_stop ();
1199     return GNUNET_SYSERR;
1200   }
1201   /* Connect to PEERSTORE */
1202   peerstore = GNUNET_PEERSTORE_connect (cfg);
1203   if (NULL == peerstore)
1204   {
1205     LOG (GNUNET_ERROR_TYPE_ERROR,
1206          _("Failed to connect to peerstore service.\n"));
1207     SENSOR_reporting_stop ();
1208     return GNUNET_SYSERR;
1209   }
1210   /* Connect to CORE */
1211   core =
1212       GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
1213                            &core_disconnect_cb, NULL, GNUNET_YES, NULL,
1214                            GNUNET_YES, core_handlers);
1215   if (NULL == core)
1216   {
1217     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1218     SENSOR_reporting_stop ();
1219     return GNUNET_SYSERR;
1220   }
1221   /* Connect to CADET */
1222   cadet =
1223       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1224                             cadet_handlers, NULL);
1225   if (NULL == cadet)
1226   {
1227     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1228     SENSOR_reporting_stop ();
1229     return GNUNET_SYSERR;
1230   }
1231   private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1232   if (NULL == private_key)
1233   {
1234     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
1235     SENSOR_reporting_stop ();
1236     return GNUNET_SYSERR;
1237   }
1238   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1239   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1240   neighborhood = 0;
1241   module_running = GNUNET_YES;
1242   return GNUNET_OK;
1243 }
1244
1245 /* end of gnunet-service-sensor_reporting.c */