-keep track of messages passed to mq
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file sensor/gnunet-service-sensor_reporting.c
23  * @brief sensor service reporting functionality
24  * @author Omar Tarabai
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "sensor.h"
29 #include "gnunet_peerstore_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_cadet_service.h"
32 #include "gnunet_applications.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
35
36 /**
37  * Retry time when failing to connect to collection point
38  */
39 #define CP_RETRY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
40
41
42 /**
43  * When we are still generating a proof-of-work and we need to send an anomaly
44  * report, we queue them until the generation is complete
45  */
46 struct AnomalyReportingQueueItem
47 {
48
49   /**
50    * DLL
51    */
52   struct AnomalyReportingQueueItem *prev;
53
54   /**
55    * DLL
56    */
57   struct AnomalyReportingQueueItem *next;
58
59   /**
60    * Message queue belonging to the peer that is the destination of the report
61    */
62   struct GNUNET_MQ_Handle *dest_mq;
63
64   /**
65    * Report type
66    */
67   int type;
68
69 };
70
71 struct AnomalyInfo
72 {
73
74   /**
75    * DLL
76    */
77   struct AnomalyInfo *prev;
78
79   /**
80    * DLL
81    */
82   struct AnomalyInfo *next;
83
84   /**
85    * Sensor information
86    */
87   struct GNUNET_SENSOR_SensorInfo *sensor;
88
89   /**
90    * Current anomalous status of sensor
91    */
92   int anomalous;
93
94   /**
95    * List of peers that reported an anomaly for this sensor
96    */
97   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
98
99   /**
100    * Report block with proof-of-work and signature
101    */
102   struct GNUNET_SENSOR_crypto_pow_block *report_block;
103
104   /**
105    * Context of an operation creating pow and signature
106    */
107   struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
108
109   /**
110    * Head of the queue of pending report destinations
111    */
112   struct AnomalyReportingQueueItem *reporting_queue_head;
113
114   /**
115    * Head of the queue of pending report destinations
116    */
117   struct AnomalyReportingQueueItem *reporting_queue_tail;
118
119 };
120
121 struct ValueInfo
122 {
123
124   /**
125    * DLL
126    */
127   struct ValueInfo *prev;
128
129   /**
130    * DLL
131    */
132   struct ValueInfo *next;
133
134   /**
135    * Sensor information
136    */
137   struct GNUNET_SENSOR_SensorInfo *sensor;
138
139   /**
140    * Last value read from sensor
141    */
142   void *last_value;
143
144   /**
145    * Size of @e last_value
146    */
147   size_t last_value_size;
148
149   /**
150    * Timestamp of last value reading
151    */
152   struct GNUNET_TIME_Absolute last_value_timestamp;
153
154   /**
155    * Has the last value seen already been reported to collection point?
156    */
157   int last_value_reported;
158
159   /**
160    * Watcher of sensor values
161    */
162   struct GNUNET_PEERSTORE_WatchContext *wc;
163
164   /**
165    * Collection point reporting task (or NULL)
166    */
167   struct GNUNET_SCHEDULER_Task *reporting_task;
168
169 };
170
171 /**
172  * Information about a connected CORE peer.
173  * Note that we only know about a connected peer if it is running the same
174  * application (sensor anomaly reporting) as us.
175  */
176 struct CorePeer
177 {
178
179   /**
180    * DLL
181    */
182   struct CorePeer *prev;
183
184   /**
185    * DLL
186    */
187   struct CorePeer *next;
188
189   /**
190    * Peer identity of connected peer
191    */
192   struct GNUNET_PeerIdentity *peer_id;
193
194   /**
195    * Message queue for messages to be sent to this peer
196    */
197   struct GNUNET_MQ_Handle *mq;
198
199 };
200
201 /**
202  * Information about a connected CADET peer (collection point).
203  */
204 struct CadetPeer
205 {
206
207   /**
208    * DLL
209    */
210   struct CadetPeer *prev;
211
212   /**
213    * DLL
214    */
215   struct CadetPeer *next;
216
217   /**
218    * Peer Identity
219    */
220   struct GNUNET_PeerIdentity peer_id;
221
222   /**
223    * CADET channel handle
224    */
225   struct GNUNET_CADET_Channel *channel;
226
227   /**
228    * Message queue for messages to be sent to this peer
229    */
230   struct GNUNET_MQ_Handle *mq;
231
232   /**
233    * CADET transmit handle
234    */
235   struct GNUNET_CADET_TransmitHandle *th;
236
237   /**
238    * Task used to try reconnection to collection point after failure
239    */
240   struct GNUNET_SCHEDULER_Task * reconnect_task;
241
242   /**
243    * Are we currently destroying the channel and its context?
244    */
245   int destroying;
246
247 };
248
249
250 /**
251  * Our configuration.
252  */
253 static const struct GNUNET_CONFIGURATION_Handle *cfg;
254
255 /**
256  * Multihashmap of loaded sensors
257  */
258 static struct GNUNET_CONTAINER_MultiHashMap *sensors;
259
260 /**
261  * Handle to peerstore service
262  */
263 static struct GNUNET_PEERSTORE_Handle *peerstore;
264
265 /**
266  * Handle to core service
267  */
268 static struct GNUNET_CORE_Handle *core;
269
270 /**
271  * Handle to CADET service
272  */
273 static struct GNUNET_CADET_Handle *cadet;
274
275 /**
276  * My peer id
277  */
278 static struct GNUNET_PeerIdentity mypeerid;
279
280 /**
281  * My private key
282  */
283 static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
284
285 /**
286  * Head of DLL of anomaly info structs
287  */
288 static struct AnomalyInfo *ai_head;
289
290 /**
291  * Tail of DLL of anomaly info structs
292  */
293 static struct AnomalyInfo *ai_tail;
294
295 /**
296  * Head of DLL of value info structs
297  */
298 static struct ValueInfo *vi_head;
299
300 /**
301  * Tail of DLL of value info structs
302  */
303 static struct ValueInfo *vi_tail;
304
305 /**
306  * Head of DLL of CORE peers
307  */
308 static struct CorePeer *corep_head;
309
310 /**
311  * Tail of DLL of CORE peers
312  */
313 static struct CorePeer *corep_tail;
314
315 /**
316  * Head of DLL of CADET peers
317  */
318 static struct CadetPeer *cadetp_head;
319
320 /**
321  * Tail of DLL of CADET peers
322  */
323 static struct CadetPeer *cadetp_tail;
324
325 /**
326  * Is the module started?
327  */
328 static int module_running = GNUNET_NO;
329
330 /**
331  * Number of known neighborhood peers
332  */
333 static int neighborhood;
334
335 /**
336  * Parameter that defines the complexity of the proof-of-work
337  */
338 static long long unsigned int pow_matching_bits;
339
340
341
342 /**
343  * Try reconnecting to collection point and send last queued message
344  */
345 static void
346 cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
347
348
349 /******************************************************************************/
350 /******************************      CLEANUP     ******************************/
351 /******************************************************************************/
352
353 /**
354  * Destroy anomaly info struct
355  *
356  * @param ai struct to destroy
357  */
358 static void
359 destroy_anomaly_info (struct AnomalyInfo *ai)
360 {
361   struct AnomalyReportingQueueItem *ar_item;
362
363   ar_item = ai->reporting_queue_head;
364   while (NULL != ar_item)
365   {
366     GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
367                                  ai->reporting_queue_tail, ar_item);
368     GNUNET_free (ar_item);
369     ar_item = ai->reporting_queue_head;
370   }
371   if (NULL != ai->report_creation_cx)
372   {
373     GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
374     ai->report_creation_cx = NULL;
375   }
376   if (NULL != ai->report_block)
377   {
378     GNUNET_free (ai->report_block);
379     ai->report_block = NULL;
380   }
381   if (NULL != ai->anomalous_neighbors)
382   {
383     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
384     ai->anomalous_neighbors = NULL;
385   }
386   GNUNET_free (ai);
387 }
388
389
390 /**
391  * Destroy value info struct
392  *
393  * @param vi struct to destroy
394  */
395 static void
396 destroy_value_info (struct ValueInfo *vi)
397 {
398   if (NULL != vi->wc)
399   {
400     GNUNET_PEERSTORE_watch_cancel (vi->wc);
401     vi->wc = NULL;
402   }
403   if (NULL != vi->reporting_task)
404   {
405     GNUNET_SCHEDULER_cancel (vi->reporting_task);
406     vi->reporting_task = NULL;
407   }
408   if (NULL != vi->last_value)
409   {
410     GNUNET_free (vi->last_value);
411     vi->last_value = NULL;
412   }
413   GNUNET_free (vi);
414 }
415
416
417 /**
418  * Destroy core peer struct
419  *
420  * @param corep struct to destroy
421  */
422 static void
423 destroy_core_peer (struct CorePeer *corep)
424 {
425   struct AnomalyInfo *ai;
426   struct AnomalyReportingQueueItem *ar_item;
427
428   ai = ai_head;
429   while (NULL != ai)
430   {
431     GNUNET_assert (NULL != ai->anomalous_neighbors);
432     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
433                                               corep->peer_id);
434     /* Remove the core peer from any reporting queues */
435     ar_item = ai->reporting_queue_head;
436     while (NULL != ar_item)
437     {
438       if (ar_item->dest_mq == corep->mq)
439       {
440         GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
441                                      ai->reporting_queue_tail, ar_item);
442         break;
443       }
444       ar_item = ar_item->next;
445     }
446     ai = ai->next;
447   }
448   if (NULL != corep->mq)
449   {
450     GNUNET_MQ_destroy (corep->mq);
451     corep->mq = NULL;
452   }
453   GNUNET_free (corep);
454 }
455
456
457 /**
458  * Destroy cadet peer struct
459  *
460  * @param cadetp struct to destroy
461  */
462 static void
463 destroy_cadet_peer (struct CadetPeer *cadetp)
464 {
465   cadetp->destroying = GNUNET_YES;
466   if (NULL != cadetp->reconnect_task)
467   {
468     GNUNET_SCHEDULER_cancel (cadetp->reconnect_task);
469     cadetp->reconnect_task = NULL;
470   }
471   if (NULL != cadetp->mq)
472   {
473     GNUNET_MQ_destroy (cadetp->mq);
474     cadetp->mq = NULL;
475   }
476   if (NULL != cadetp->channel)
477   {
478     GNUNET_CADET_channel_destroy (cadetp->channel);
479     cadetp->channel = NULL;
480   }
481   GNUNET_free (cadetp);
482 }
483
484
485 /**
486  * Stop sensor reporting module
487  */
488 void
489 SENSOR_reporting_stop ()
490 {
491   struct ValueInfo *vi;
492   struct CorePeer *corep;
493   struct AnomalyInfo *ai;
494   struct CadetPeer *cadetp;
495
496   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor anomaly reporting module.\n");
497   module_running = GNUNET_NO;
498   neighborhood = 0;
499   /* Destroy value info's */
500   vi = vi_head;
501   while (NULL != vi)
502   {
503     GNUNET_CONTAINER_DLL_remove (vi_head, vi_tail, vi);
504     destroy_value_info (vi);
505     vi = vi_head;
506   }
507   /* Destroy core peers */
508   corep = corep_head;
509   while (NULL != corep)
510   {
511     GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
512     destroy_core_peer (corep);
513     corep = corep_head;
514   }
515   /* Destroy anomaly info's */
516   ai = ai_head;
517   while (NULL != ai)
518   {
519     GNUNET_CONTAINER_DLL_remove (ai_head, ai_tail, ai);
520     destroy_anomaly_info (ai);
521     ai = ai_head;
522   }
523   /* Destroy cadet peers */
524   cadetp = cadetp_head;
525   while (NULL != cadetp)
526   {
527     GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
528     destroy_cadet_peer (cadetp);
529     cadetp = cadetp_head;
530   }
531   /* Disconnect from other services */
532   if (NULL != core)
533   {
534     GNUNET_CORE_disconnect (core);
535     core = NULL;
536   }
537   if (NULL != peerstore)
538   {
539     GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_NO);
540     peerstore = NULL;
541   }
542   if (NULL != cadet)
543   {
544     GNUNET_CADET_disconnect (cadet);
545     cadet = NULL;
546   }
547 }
548
549
550 /******************************************************************************/
551 /******************************      HELPERS     ******************************/
552 /******************************************************************************/
553
554
555 /**
556  * Gets the anomaly info struct related to the given sensor
557  *
558  * @param sensor Sensor to search by
559  */
560 static struct AnomalyInfo *
561 get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
562 {
563   struct AnomalyInfo *ai;
564
565   ai = ai_head;
566   while (NULL != ai)
567   {
568     if (ai->sensor == sensor)
569     {
570       return ai;
571     }
572     ai = ai->next;
573   }
574   return NULL;
575 }
576
577
578 /**
579  * Function called to notify a client about the connection
580  * begin ready to queue more data.  "buf" will be
581  * NULL and "size" zero if the connection was closed for
582  * writing in the meantime.
583  *
584  * @param cls closure
585  * @param size number of bytes available in buf
586  * @param buf where the callee should write the message
587  * @return number of bytes written to buf
588  */
589 static size_t
590 cp_mq_ntr (void *cls, size_t size, void *buf)
591 {
592   struct CadetPeer *cadetp = cls;
593   const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (cadetp->mq);
594   uint16_t msize;
595
596   LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_ntr()\n");
597   cadetp->th = NULL;
598   if (NULL == buf)
599   {
600     LOG (GNUNET_ERROR_TYPE_INFO,
601          "Sending anomaly report to collection point failed."
602          " Retrying connection in %s.\n",
603          GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
604     cadetp->reconnect_task =
605         GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
606     return 0;
607   }
608   msize = ntohs (msg->size);
609   GNUNET_assert (msize <= size);
610   memcpy (buf, msg, msize);
611   GNUNET_MQ_impl_send_continue (cadetp->mq);
612   return msize;
613 }
614
615
616 /**
617  * Try reconnecting to collection point and send last queued message
618  */
619 static void
620 cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
621 {
622   struct CadetPeer *cadetp = cls;
623   const struct GNUNET_MessageHeader *msg;
624
625   LOG (GNUNET_ERROR_TYPE_INFO,
626        "Retrying connection to collection point `%s'.\n",
627        GNUNET_i2s (&cadetp->peer_id));
628   cadetp->reconnect_task = NULL;
629   GNUNET_assert (NULL == cadetp->channel);
630   cadetp->channel =
631       GNUNET_CADET_channel_create (cadet, cadetp, &cadetp->peer_id,
632                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
633                                    GNUNET_CADET_OPTION_RELIABLE);
634   msg = GNUNET_MQ_impl_current (cadetp->mq);
635   cadetp->th =
636       GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
637                                           GNUNET_TIME_UNIT_FOREVER_REL,
638                                           ntohs (msg->size), cp_mq_ntr, cadetp);
639 }
640
641
642 /**
643  * Signature of functions implementing the
644  * sending functionality of a message queue.
645  *
646  * @param mq the message queue
647  * @param msg the message to send
648  * @param impl_state state of the implementation
649  */
650 static void
651 cp_mq_send_impl (struct GNUNET_MQ_Handle *mq,
652                  const struct GNUNET_MessageHeader *msg, void *impl_state)
653 {
654   struct CadetPeer *cadetp = impl_state;
655
656   LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_send_impl()\n");
657   GNUNET_assert (NULL == cadetp->th);
658   if (NULL == cadetp->channel)
659   {
660     LOG (GNUNET_ERROR_TYPE_INFO,
661          "Sending anomaly report to collection point failed."
662          " Retrying connection in %s.\n",
663          GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
664     cadetp->reconnect_task =
665         GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
666     return;
667   }
668   cadetp->th =
669       GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
670                                           GNUNET_TIME_UNIT_FOREVER_REL,
671                                           ntohs (msg->size), cp_mq_ntr, cadetp);
672 }
673
674
675 /**
676  * Signature of functions implementing the
677  * destruction of a message queue.
678  * Implementations must not free 'mq', but should
679  * take care of 'impl_state'.
680  *
681  * @param mq the message queue to destroy
682  * @param impl_state state of the implementation
683  */
684 static void
685 cp_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
686 {
687   struct CadetPeer *cp = impl_state;
688
689   LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_destroy_impl()\n");
690   if (NULL != cp->th)
691   {
692     GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
693     cp->th = NULL;
694   }
695 }
696
697
698 /**
699  * Create the message queue used to send messages to a collection point.
700  * This will be used to make sure that the message are queued even if the
701  * connection to the collection point can not be established at the moment.
702  *
703  * @param cp CadetPeer information struct
704  * @return Message queue handle
705  */
706 static struct GNUNET_MQ_Handle *
707 cp_mq_create (struct CadetPeer *cp)
708 {
709   return GNUNET_MQ_queue_for_callbacks (cp_mq_send_impl, cp_mq_destroy_impl,
710                                         NULL, cp, NULL, NULL, NULL);
711 }
712
713
714 /**
715  * Returns context of a connected CADET peer.
716  * Creates it first if didn't exist before.
717  *
718  * @param pid Peer Identity
719  * @return Context of connected CADET peer
720  */
721 static struct CadetPeer *
722 get_cadet_peer (struct GNUNET_PeerIdentity pid)
723 {
724   struct CadetPeer *cadetp;
725
726   cadetp = cadetp_head;
727   while (NULL != cadetp)
728   {
729     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cadetp->peer_id))
730       return cadetp;
731     cadetp = cadetp->next;
732   }
733   LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
734        GNUNET_i2s (&pid));
735   /* Not found, create struct and channel */
736   cadetp = GNUNET_new (struct CadetPeer);
737   cadetp->peer_id = pid;
738   cadetp->channel =
739       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
740                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
741                                    GNUNET_CADET_OPTION_RELIABLE);
742   cadetp->mq = cp_mq_create (cadetp);
743   cadetp->reconnect_task = NULL;
744   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
745   return cadetp;
746 }
747
748
749 /**
750  * This function is called only when we have a block ready and want to send it
751  * to the given peer (represented by its message queue)
752  *
753  * @param mq Message queue to put the message in
754  * @param ai Anomaly info to report
755  * @param type Message type
756  */
757 static void
758 do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
759                         int type)
760 {
761   struct GNUNET_MessageHeader *msg;
762   struct GNUNET_MQ_Envelope *ev;
763   size_t block_size;
764
765   GNUNET_assert (NULL != ai->report_block);
766   block_size =
767       sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
768       ai->report_block->msg_size;
769   ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
770   memcpy (&msg[1], ai->report_block, block_size);
771   GNUNET_MQ_send (mq, ev);
772 }
773
774
775 /**
776  * Check if we have signed and proof-of-work block ready.
777  * If yes, we send the report directly, if no, we enqueue the reporting until
778  * the block is ready.
779  *
780  * @param mq Message queue to put the message in
781  * @param ai Anomaly info to report
782  * @param p2p Is the report sent to a neighboring peer
783  */
784 static void
785 send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
786                      int p2p)
787 {
788   struct AnomalyReportingQueueItem *ar_item;
789   int type;
790
791   type =
792       (GNUNET_YES ==
793        p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
794       GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
795   if (NULL == ai->report_block)
796   {
797     ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
798
799     ar_item->dest_mq = mq;
800     ar_item->type = type;
801     GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
802                                       ai->reporting_queue_tail, ar_item);
803   }
804   else
805   {
806     do_send_anomaly_report (mq, ai, type);
807   }
808 }
809
810
811 /**
812  * Callback when the crypto module finished created proof-of-work and signature
813  * for an anomaly report.
814  *
815  * @param cls Closure, a `struct AnomalyInfo *`
816  * @param block The resulting block, NULL on error
817  */
818 static void
819 report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
820 {
821   struct AnomalyInfo *ai = cls;
822   struct AnomalyReportingQueueItem *ar_item;
823
824   ai->report_creation_cx = NULL;
825   if (NULL != ai->report_block)
826   {
827     LOG (GNUNET_ERROR_TYPE_ERROR,
828          _("Double creation of proof-of-work, this should not happen.\n"));
829     return;
830   }
831   if (NULL == block)
832   {
833     LOG (GNUNET_ERROR_TYPE_ERROR,
834          _("Failed to create pow and signature block.\n"));
835     return;
836   }
837   LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
838   ai->report_block =
839       GNUNET_memdup (block,
840                      sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
841                      block->msg_size);
842   ar_item = ai->reporting_queue_head;
843   while (NULL != ar_item)
844   {
845     GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
846                                  ai->reporting_queue_tail, ar_item);
847     do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
848     GNUNET_free (ar_item);
849     ar_item = ai->reporting_queue_head;
850   }
851 }
852
853
854 /**
855  * When a change to the anomaly info of a sensor is done, this function should
856  * be called to create the message, its proof-of-work and signuature ready to
857  * be sent to other peers or collection point.
858  *
859  * @param ai Anomaly Info struct
860  */
861 static void
862 update_anomaly_report_pow_block (struct AnomalyInfo *ai)
863 {
864   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
865   struct GNUNET_TIME_Absolute timestamp;
866
867   LOG (GNUNET_ERROR_TYPE_DEBUG,
868        "Updating anomaly report POW block due to data change.\n");
869   if (NULL != ai->report_block)
870   {
871     GNUNET_free (ai->report_block);
872     ai->report_block = NULL;
873   }
874   if (NULL != ai->report_creation_cx)
875   {
876     /* If a creation is already running, cancel it because the data changed */
877     GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
878     ai->report_creation_cx = NULL;
879   }
880   arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
881
882   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
883                       &arm->sensorname_hash);
884   arm->sensorversion_major = htons (ai->sensor->version_major);
885   arm->sensorversion_minor = htons (ai->sensor->version_minor);
886   arm->anomalous = htons (ai->anomalous);
887   arm->anomalous_neighbors =
888       (0 ==
889        neighborhood) ? 0 : ((float)
890                             GNUNET_CONTAINER_multipeermap_size
891                             (ai->anomalous_neighbors)) / neighborhood;
892   timestamp = GNUNET_TIME_absolute_get ();
893   ai->report_creation_cx =
894       GNUNET_SENSOR_crypto_pow_sign (arm,
895                                      sizeof (struct
896                                              GNUNET_SENSOR_AnomalyReportMessage),
897                                      &timestamp, &mypeerid.public_key,
898                                      private_key, pow_matching_bits,
899                                      &report_creation_cb, ai);
900   GNUNET_free (arm);
901 }
902
903
904 /**
905  * Create a sensor value message from a given value info struct inside a MQ
906  * envelope.
907  *
908  * @param vi Value info struct to use
909  * @return Envelope with message
910  */
911 static struct GNUNET_MQ_Envelope *
912 create_value_message (struct ValueInfo *vi)
913 {
914   struct GNUNET_SENSOR_ValueMessage *vm;
915   struct GNUNET_MQ_Envelope *ev;
916
917   ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
918                             GNUNET_MESSAGE_TYPE_SENSOR_READING);
919   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
920                       &vm->sensorname_hash);
921   vm->sensorversion_major = htons (vi->sensor->version_major);
922   vm->sensorversion_minor = htons (vi->sensor->version_minor);
923   vm->timestamp = vi->last_value_timestamp;
924   vm->value_size = htons (vi->last_value_size);
925   memcpy (&vm[1], vi->last_value, vi->last_value_size);
926   return ev;
927 }
928
929
930 /******************************************************************************/
931 /***************************      CORE Handlers     ***************************/
932 /******************************************************************************/
933
934
935 /**
936  * An inbound anomaly report is received from a peer through CORE.
937  *
938  * @param cls closure (unused)
939  * @param peer the other peer involved
940  * @param message the actual message
941  * @return #GNUNET_OK to keep the connection open,
942  *         #GNUNET_SYSERR to close connection to the peer (signal serious error)
943  */
944 static int
945 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
946                        const struct GNUNET_MessageHeader *message)
947 {
948   struct GNUNET_SENSOR_crypto_pow_block *report_block;
949   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
950   struct GNUNET_SENSOR_SensorInfo *sensor;
951   struct AnomalyInfo *my_anomaly_info;
952   struct CadetPeer *cadetp;
953   int peer_anomalous;
954   int peer_in_anomalous_list;
955
956   /* Verify proof-of-work, signature and extract report message */
957   report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
958   if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
959       GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
960                                             (struct GNUNET_CRYPTO_EddsaPublicKey
961                                              *) &other->public_key,
962                                             (void **) &arm))
963   {
964     LOG (GNUNET_ERROR_TYPE_WARNING,
965          "Received invalid anomaly report from peer `%s'.\n",
966          GNUNET_i2s (other));
967     GNUNET_break_op (0);
968     return GNUNET_SYSERR;
969   }
970   /* Now we parse the content of the message */
971   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
972   if (NULL == sensor ||
973       sensor->version_major != ntohs (arm->sensorversion_major) ||
974       sensor->version_minor != ntohs (arm->sensorversion_minor))
975   {
976     LOG (GNUNET_ERROR_TYPE_WARNING,
977          "I don't have the sensor reported by the peer `%s'.\n",
978          GNUNET_i2s (other));
979     return GNUNET_OK;
980   }
981   my_anomaly_info = get_anomaly_info_by_sensor (sensor);
982   GNUNET_assert (NULL != my_anomaly_info);
983   peer_in_anomalous_list =
984       GNUNET_CONTAINER_multipeermap_contains
985       (my_anomaly_info->anomalous_neighbors, other);
986   peer_anomalous = ntohs (arm->anomalous);
987   LOG (GNUNET_ERROR_TYPE_DEBUG,
988        "Received an anomaly update from neighbour `%s' (%d).\n",
989        GNUNET_i2s (other), peer_anomalous);
990   if (GNUNET_YES == peer_anomalous)
991   {
992     if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
993       GNUNET_break_op (0);
994     else
995       GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
996                                          other, NULL,
997                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
998   }
999   else
1000   {
1001     if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
1002       GNUNET_break_op (0);
1003     else
1004       GNUNET_CONTAINER_multipeermap_remove_all
1005           (my_anomaly_info->anomalous_neighbors, other);
1006   }
1007   /* This is important to create an updated block since the data changed */
1008   update_anomaly_report_pow_block (my_anomaly_info);
1009   /* Send anomaly update to collection point only if I have the same anomaly */
1010   if (GNUNET_YES == my_anomaly_info->anomalous &&
1011       NULL != sensor->collection_point &&
1012       GNUNET_YES == sensor->report_anomalies)
1013   {
1014     LOG (GNUNET_ERROR_TYPE_DEBUG,
1015          "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
1016          GNUNET_i2s (sensor->collection_point));
1017     cadetp = get_cadet_peer (*sensor->collection_point);
1018     send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
1019   }
1020   return GNUNET_OK;
1021 }
1022
1023
1024 /******************************************************************************/
1025 /************************      PEERSTORE callbacks     ************************/
1026 /******************************************************************************/
1027
1028
1029 /**
1030  * Sensor value watch callback
1031  *
1032  * @param cls Closure, ValueInfo struct related to the sensor we are watching
1033  * @param record PEERSTORE new record, NULL if error
1034  * @param emsg Error message, NULL if no error
1035  * @return #GNUNET_YES to continue watching
1036  */
1037 static int
1038 value_watch_cb (void *cls,
1039                 const struct GNUNET_PEERSTORE_Record *record,
1040                 const char *emsg)
1041 {
1042   struct ValueInfo *vi = cls;
1043
1044   if (NULL != emsg)
1045   {
1046     LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
1047     return GNUNET_YES;
1048   }
1049   if (NULL != vi->last_value)
1050   {
1051     GNUNET_free (vi->last_value);
1052     vi->last_value_size = 0;
1053   }
1054   vi->last_value = GNUNET_memdup (record->value, record->value_size);
1055   vi->last_value_size = record->value_size;
1056   vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
1057   vi->last_value_reported = GNUNET_NO;
1058   return GNUNET_YES;
1059 }
1060
1061
1062 /******************************************************************************/
1063 /**************************      CORE callbacks     ***************************/
1064 /******************************************************************************/
1065
1066
1067 /**
1068  * Method called whenever a CORE peer disconnects.
1069  *
1070  * @param cls closure (unused)
1071  * @param peer peer identity this notification is about
1072  */
1073 static void
1074 core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
1075 {
1076   struct CorePeer *corep;
1077
1078   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
1079     return;
1080   LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
1081        GNUNET_i2s (peer));
1082   neighborhood--;
1083   corep = corep_head;
1084   while (NULL != corep)
1085   {
1086     if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
1087     {
1088       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
1089       destroy_core_peer (corep);
1090       return;
1091     }
1092     corep = corep->next;
1093   }
1094 }
1095
1096
1097 /**
1098  * Method called whenever a given peer connects through CORE.
1099  *
1100  * @param cls closure (unused)
1101  * @param peer peer identity this notification is about
1102  */
1103 static void
1104 core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
1105 {
1106   struct CorePeer *corep;
1107   struct AnomalyInfo *ai;
1108
1109   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
1110     return;
1111   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
1112        GNUNET_i2s (peer));
1113   neighborhood++;
1114   corep = GNUNET_new (struct CorePeer);
1115   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
1116   corep->mq = GNUNET_CORE_mq_create (core, peer);
1117   GNUNET_CONTAINER_DLL_insert (corep_head, corep_tail, corep);
1118   /* Send any locally anomalous sensors to the new peer */
1119   ai = ai_head;
1120   while (NULL != ai)
1121   {
1122     if (GNUNET_YES == ai->anomalous)
1123     {
1124       LOG (GNUNET_ERROR_TYPE_DEBUG,
1125            "Updating newly connected neighbor `%s' with anomalous sensor.\n",
1126            GNUNET_i2s (peer));
1127       send_anomaly_report (corep->mq, ai, GNUNET_YES);
1128     }
1129     ai = ai->next;
1130   }
1131 }
1132
1133
1134 /**
1135  * Function called after #GNUNET_CORE_connect has succeeded (or failed
1136  * for good).  Note that the private key of the peer is intentionally
1137  * not exposed here; if you need it, your process should try to read
1138  * the private key file directly (which should work if you are
1139  * authorized...).  Implementations of this function must not call
1140  * #GNUNET_CORE_disconnect (other than by scheduling a new task to
1141  * do this later).
1142  *
1143  * @param cls closure (unused)
1144  * @param my_identity ID of this peer, NULL if we failed
1145  */
1146 static void
1147 core_startup_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
1148 {
1149   if (NULL == my_identity)
1150   {
1151     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1152     SENSOR_reporting_stop ();
1153     return;
1154   }
1155   if (0 != GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, my_identity))
1156   {
1157     LOG (GNUNET_ERROR_TYPE_ERROR,
1158          _("Peer identity received from CORE init doesn't match ours.\n"));
1159     SENSOR_reporting_stop ();
1160     return;
1161   }
1162 }
1163
1164
1165 /******************************************************************************/
1166 /*************************      CADET callbacks     ***************************/
1167 /******************************************************************************/
1168
1169 /**
1170  * Function called whenever a channel is destroyed.  Should clean up
1171  * any associated state.
1172  *
1173  * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
1174  *
1175  * @param cls closure (set from #GNUNET_CADET_connect)
1176  * @param channel connection to the other end (henceforth invalid)
1177  * @param channel_ctx place where local state associated
1178  *                   with the channel is stored
1179  */
1180 static void
1181 cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
1182                          void *channel_ctx)
1183 {
1184   struct CadetPeer *cadetp = channel_ctx;
1185
1186   if (GNUNET_YES == cadetp->destroying)
1187     return;
1188   LOG (GNUNET_ERROR_TYPE_DEBUG,
1189        "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
1190        GNUNET_i2s (&cadetp->peer_id));
1191   if (NULL != cadetp->th)
1192   {
1193     GNUNET_CADET_notify_transmit_ready_cancel (cadetp->th);
1194     cadetp->th = NULL;
1195   }
1196   cadetp->channel = NULL;
1197 }
1198
1199
1200 /******************************************************************************/
1201 /**********************      Local anomaly receiver     ***********************/
1202 /******************************************************************************/
1203
1204
1205 /**
1206  * Used by the analysis module to tell the reporting module about a change in
1207  * the anomaly status of a sensor.
1208  *
1209  * @param sensor Related sensor
1210  * @param anomalous The new sensor anomalous status
1211  */
1212 void
1213 SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
1214                                  int anomalous)
1215 {
1216   struct AnomalyInfo *ai;
1217   struct CorePeer *corep;
1218   struct CadetPeer *cadetp;
1219
1220   if (GNUNET_NO == module_running)
1221     return;
1222   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
1223   ai = get_anomaly_info_by_sensor (sensor);
1224   GNUNET_assert (NULL != ai);
1225   ai->anomalous = anomalous;
1226   /* This is important to create an updated block since the data changed */
1227   update_anomaly_report_pow_block (ai);
1228   /* Report change to all neighbors */
1229   corep = corep_head;
1230   while (NULL != corep)
1231   {
1232     LOG (GNUNET_ERROR_TYPE_DEBUG,
1233          "Sending an anomaly report to neighbor `%s'.\n",
1234          GNUNET_i2s (corep->peer_id));
1235     send_anomaly_report (corep->mq, ai, GNUNET_YES);
1236     corep = corep->next;
1237   }
1238   /* Report change to collection point if need */
1239   if (NULL != ai->sensor->collection_point &&
1240       GNUNET_YES == ai->sensor->report_anomalies)
1241   {
1242     LOG (GNUNET_ERROR_TYPE_DEBUG,
1243          "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
1244          GNUNET_i2s (ai->sensor->collection_point));
1245     cadetp = get_cadet_peer (*ai->sensor->collection_point);
1246     send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
1247   }
1248 }
1249
1250
1251 /******************************************************************************/
1252 /*******************      Reporting values (periodic)     *********************/
1253 /******************************************************************************/
1254
1255
1256 /**
1257  * Task scheduled to send values to collection point
1258  *
1259  * @param cls closure, a `struct ValueReportingContext *`
1260  * @param tc unused
1261  */
1262 static void
1263 report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1264 {
1265   struct ValueInfo *vi = cls;
1266   struct GNUNET_SENSOR_SensorInfo *sensor = vi->sensor;
1267   struct CadetPeer *cadetp;
1268   struct GNUNET_MQ_Envelope *ev;
1269
1270   vi->reporting_task =
1271       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1272                                     &report_value, vi);
1273   if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
1274   {
1275     LOG (GNUNET_ERROR_TYPE_WARNING,
1276          "Did not receive a fresh value from `%s' to report.\n", sensor->name);
1277     return;
1278   }
1279   LOG (GNUNET_ERROR_TYPE_DEBUG,
1280        "Now trying to report last seen value of `%s' to collection point.\n",
1281        sensor->name);
1282   cadetp = get_cadet_peer (*sensor->collection_point);
1283   if (NULL == cadetp->channel)
1284   {
1285     LOG (GNUNET_ERROR_TYPE_WARNING,
1286          "Trying to send value to collection point but connection failed, discarding.\n");
1287     return;
1288   }
1289   ev = create_value_message (vi);
1290   GNUNET_MQ_send (cadetp->mq, ev);
1291   vi->last_value_reported = GNUNET_YES;
1292 }
1293
1294
1295 /******************************************************************************/
1296 /********************************      INIT     *******************************/
1297 /******************************************************************************/
1298
1299
1300 /**
1301  * Iterator for defined sensors and creates anomaly info context
1302  *
1303  * @param cls unused
1304  * @param key unused
1305  * @param value a `struct GNUNET_SENSOR_SensorInfo *` with sensor information
1306  * @return #GNUNET_YES to continue iterations
1307  */
1308 static int
1309 init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
1310                        void *value)
1311 {
1312   struct GNUNET_SENSOR_SensorInfo *sensor = value;
1313   struct AnomalyInfo *ai;
1314   struct ValueInfo *vi;
1315
1316   /* Create sensor anomaly info context */
1317   ai = GNUNET_new (struct AnomalyInfo);
1318
1319   ai->sensor = sensor;
1320   ai->anomalous = GNUNET_NO;
1321   ai->anomalous_neighbors =
1322       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
1323   ai->report_block = NULL;
1324   ai->report_creation_cx = NULL;
1325   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
1326   /* Create sensor value info context (if needed to be reported) */
1327   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
1328     return GNUNET_YES;
1329   LOG (GNUNET_ERROR_TYPE_INFO,
1330        "Reporting sensor `%s' values to collection point `%s' every %s.\n",
1331        sensor->name, GNUNET_i2s_full (sensor->collection_point),
1332        GNUNET_STRINGS_relative_time_to_string (sensor->value_reporting_interval,
1333                                                GNUNET_YES));
1334   vi = GNUNET_new (struct ValueInfo);
1335   vi->sensor = sensor;
1336   vi->last_value = NULL;
1337   vi->last_value_size = 0;
1338   vi->last_value_reported = GNUNET_NO;
1339   vi->wc =
1340       GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
1341                               &value_watch_cb, vi);
1342   vi->reporting_task =
1343       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
1344                                     &report_value, vi);
1345   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
1346   return GNUNET_YES;
1347 }
1348
1349
1350 /**
1351  * Start the sensor anomaly reporting module
1352  *
1353  * @param c our service configuration
1354  * @param s multihashmap of loaded sensors
1355  * @return #GNUNET_OK if started successfully, #GNUNET_SYSERR otherwise
1356  */
1357 int
1358 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
1359                         struct GNUNET_CONTAINER_MultiHashMap *s)
1360 {
1361   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
1362     {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
1363      sizeof (struct GNUNET_MessageHeader) +
1364      sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
1365      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
1366     {NULL, 0, 0}
1367   };
1368   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1369     {NULL, 0, 0}
1370   };
1371
1372   LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
1373   GNUNET_assert (NULL != s);
1374   sensors = s;
1375   cfg = c;
1376   if (GNUNET_OK !=
1377       GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
1378                                              "POW_MATCHING_BITS",
1379                                              &pow_matching_bits))
1380   {
1381     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
1382                                "POW_MATCHING_BITS");
1383     SENSOR_reporting_stop ();
1384     return GNUNET_SYSERR;
1385   }
1386   if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
1387   {
1388     LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
1389          pow_matching_bits, sizeof (struct GNUNET_HashCode));
1390     SENSOR_reporting_stop ();
1391     return GNUNET_SYSERR;
1392   }
1393   /* Connect to PEERSTORE */
1394   peerstore = GNUNET_PEERSTORE_connect (cfg);
1395   if (NULL == peerstore)
1396   {
1397     LOG (GNUNET_ERROR_TYPE_ERROR,
1398          _("Failed to connect to peerstore service.\n"));
1399     SENSOR_reporting_stop ();
1400     return GNUNET_SYSERR;
1401   }
1402   /* Connect to CORE */
1403   core =
1404       GNUNET_CORE_connect (cfg, NULL, &core_startup_cb, core_connect_cb,
1405                            &core_disconnect_cb, NULL, GNUNET_YES, NULL,
1406                            GNUNET_YES, core_handlers);
1407   if (NULL == core)
1408   {
1409     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CORE service.\n"));
1410     SENSOR_reporting_stop ();
1411     return GNUNET_SYSERR;
1412   }
1413   /* Connect to CADET */
1414   cadet =
1415       GNUNET_CADET_connect (cfg, NULL, NULL, &cadet_channel_destroyed,
1416                             cadet_handlers, NULL);
1417   if (NULL == cadet)
1418   {
1419     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to connect to CADET service.\n"));
1420     SENSOR_reporting_stop ();
1421     return GNUNET_SYSERR;
1422   }
1423   private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
1424   if (NULL == private_key)
1425   {
1426     LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
1427     SENSOR_reporting_stop ();
1428     return GNUNET_SYSERR;
1429   }
1430   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
1431   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
1432   neighborhood = 0;
1433   module_running = GNUNET_YES;
1434   return GNUNET_OK;
1435 }
1436
1437 /* end of gnunet-service-sensor_reporting.c */