sensor: minor fixes
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
index 15edd149a72a303d6293d86729ff4843510362d1..104c669459c39fdae35f7e315743728a54079333 100644 (file)
 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
 
 
+/**
+ * When we are still generating a proof-of-work and we need to send an anomaly
+ * report, we queue them until the generation is complete
+ */
+struct AnomalyReportingQueueItem
+{
+
+  /**
+   * DLL
+   */
+  struct AnomalyReportingQueueItem *prev;
+
+  /**
+   * DLL
+   */
+  struct AnomalyReportingQueueItem *next;
+
+  /**
+   * Message queue belonging to the peer that is the destination of the report
+   */
+  struct GNUNET_MQ_Handle *dest_mq;
+
+  /**
+   * Report type
+   */
+  int type;
+
+};
+
 struct AnomalyInfo
 {
 
@@ -62,6 +91,26 @@ struct AnomalyInfo
    */
   struct GNUNET_CONTAINER_MultiPeerMap *anomalous_neighbors;
 
+  /**
+   * Report block with proof-of-work and signature
+   */
+  struct GNUNET_SENSOR_crypto_pow_block *report_block;
+
+  /**
+   * Context of an operation creating pow and signature
+   */
+  struct GNUNET_SENSOR_crypto_pow_context *report_creation_cx;
+
+  /**
+   * Head of the queue of pending report destinations
+   */
+  struct AnomalyReportingQueueItem *reporting_queue_head;
+
+  /**
+   * Head of the queue of pending report destinations
+   */
+  struct AnomalyReportingQueueItem *reporting_queue_tail;
+
 };
 
 struct ValueInfo
@@ -213,6 +262,11 @@ static struct GNUNET_CADET_Handle *cadet;
  */
 static struct GNUNET_PeerIdentity mypeerid;
 
+/**
+ * My private key
+ */
+static struct GNUNET_CRYPTO_EddsaPrivateKey *private_key;
+
 /**
  * Head of DLL of anomaly info structs
  */
@@ -263,6 +317,11 @@ static int module_running = GNUNET_NO;
  */
 static int neighborhood;
 
+/**
+ * Parameter that defines the complexity of the proof-of-work
+ */
+static long long unsigned int pow_matching_bits;
+
 
 
 /******************************************************************************/
@@ -277,8 +336,31 @@ static int neighborhood;
 static void
 destroy_anomaly_info (struct AnomalyInfo *ai)
 {
+  struct AnomalyReportingQueueItem *ar_item;
+
+  ar_item = ai->reporting_queue_head;
+  while (NULL != ar_item)
+  {
+    GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
+                                 ai->reporting_queue_tail, ar_item);
+    GNUNET_free (ar_item);
+    ar_item = ai->reporting_queue_head;
+  }
+  if (NULL != ai->report_creation_cx)
+  {
+    GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
+    ai->report_creation_cx = NULL;
+  }
+  if (NULL != ai->report_block)
+  {
+    GNUNET_free (ai->report_block);
+    ai->report_block = NULL;
+  }
   if (NULL != ai->anomalous_neighbors)
+  {
     GNUNET_CONTAINER_multipeermap_destroy (ai->anomalous_neighbors);
+    ai->anomalous_neighbors = NULL;
+  }
   GNUNET_free (ai);
 }
 
@@ -472,13 +554,15 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid)
       return cadetp;
     cadetp = cadetp->next;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
+       GNUNET_i2s (&pid));
   /* Not found, create struct and channel */
   cadetp = GNUNET_new (struct CadetPeer);
   cadetp->peer_id = pid;
   cadetp->channel =
       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
-                                   GNUNET_CADET_OPTION_DEFAULT);
+                                   GNUNET_CADET_OPTION_RELIABLE);
   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
   return cadetp;
@@ -486,28 +570,157 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid)
 
 
 /**
- * Create an anomaly report message from a given anomaly info struct inside a
- * MQ envelope.
+ * This function is called only when we have a block ready and want to send it
+ * to the given peer (represented by its message queue)
  *
- * @param ai Anomaly info struct to use
- * @return Envelope with message
+ * @param mq Message queue to put the message in
+ * @param ai Anomaly info to report
+ * @param type Message type
  */
-static struct GNUNET_MQ_Envelope *
-create_anomaly_report_message (struct AnomalyInfo *ai)
+static void
+do_send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
+                        int type)
 {
-  struct GNUNET_SENSOR_AnomalyReportMessage *arm;
+  struct GNUNET_MessageHeader *msg;
   struct GNUNET_MQ_Envelope *ev;
+  size_t block_size;
+
+  GNUNET_assert (NULL != ai->report_block);
+  block_size =
+      sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
+      ai->report_block->msg_size;
+  ev = GNUNET_MQ_msg_header_extra (msg, block_size, type);
+  memcpy (&msg[1], ai->report_block, block_size);
+  GNUNET_MQ_send (mq, ev);
+}
+
+
+/**
+ * Check if we have signed and proof-of-work block ready.
+ * If yes, we send the report directly, if no, we enqueue the reporting until
+ * the block is ready.
+ *
+ * @param mq Message queue to put the message in
+ * @param ai Anomaly info to report
+ * @param p2p Is the report sent to a neighboring peer
+ */
+static void
+send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
+                     int p2p)
+{
+  struct AnomalyReportingQueueItem *ar_item;
+  int type;
+
+  type =
+      (GNUNET_YES ==
+       p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
+      GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
+  if (NULL == ai->report_block)
+  {
+    ar_item = GNUNET_new (struct AnomalyReportingQueueItem);
+
+    ar_item->dest_mq = mq;
+    ar_item->type = type;
+    GNUNET_CONTAINER_DLL_insert_tail (ai->reporting_queue_head,
+                                      ai->reporting_queue_tail, ar_item);
+  }
+  else
+  {
+    do_send_anomaly_report (mq, ai, type);
+  }
+}
+
+
+/**
+ * Callback when the crypto module finished created proof-of-work and signature
+ * for an anomaly report.
+ *
+ * @param cls Closure, a `struct AnomalyInfo *`
+ * @param block The resulting block, NULL on error
+ */
+static void
+report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
+{
+  struct AnomalyInfo *ai = cls;
+  struct AnomalyReportingQueueItem *ar_item;
+
+  ai->report_creation_cx = NULL;
+  if (NULL != ai->report_block)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         _("Double creation of proof-of-work, this should not happen.\n"));
+    return;
+  }
+  if (NULL == block)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         _("Failed to create pow and signature block.\n"));
+    return;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
+  ai->report_block =
+      GNUNET_memdup (block,
+                     sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
+                     block->msg_size);
+  ar_item = ai->reporting_queue_head;
+  while (NULL != ar_item)
+  {
+    GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
+                                 ai->reporting_queue_tail, ar_item);
+    do_send_anomaly_report (ar_item->dest_mq, ai, ar_item->type);
+    GNUNET_free (ar_item);
+    ar_item = ai->reporting_queue_head;
+  }
+}
+
+
+/**
+ * When a change to the anomaly info of a sensor is done, this function should
+ * be called to create the message, its proof-of-work and signuature ready to
+ * be sent to other peers or collection point.
+ *
+ * @param ai Anomaly Info struct
+ */
+static void
+update_anomaly_report_pow_block (struct AnomalyInfo *ai)
+{
+  struct GNUNET_SENSOR_AnomalyReportMessage *arm;
+  struct GNUNET_TIME_Absolute timestamp;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Updating anomaly report POW block due to data change.\n");
+  if (NULL != ai->report_block)
+  {
+    GNUNET_free (ai->report_block);
+    ai->report_block = NULL;
+  }
+  if (NULL != ai->report_creation_cx)
+  {
+    /* If a creation is already running, cancel it because the data changed */
+    GNUNET_SENSOR_crypto_pow_sign_cancel (ai->report_creation_cx);
+    ai->report_creation_cx = NULL;
+  }
+  arm = GNUNET_new (struct GNUNET_SENSOR_AnomalyReportMessage);
 
-  ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
                       &arm->sensorname_hash);
   arm->sensorversion_major = htons (ai->sensor->version_major);
   arm->sensorversion_minor = htons (ai->sensor->version_minor);
   arm->anomalous = htons (ai->anomalous);
   arm->anomalous_neighbors =
-      ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
-      neighborhood;
-  return ev;
+      (0 ==
+       neighborhood) ? 0 : ((float) GNUNET_CONTAINER_multipeermap_size (ai->
+                                                                        anomalous_neighbors))
+      / neighborhood;
+  timestamp = GNUNET_TIME_absolute_get ();
+  ai->report_creation_cx =
+      GNUNET_SENSOR_crypto_pow_sign (arm,
+                                     sizeof (struct
+                                             GNUNET_SENSOR_AnomalyReportMessage),
+                                     &timestamp, &mypeerid.public_key,
+                                     private_key, pow_matching_bits,
+                                     &report_creation_cb, ai);
+  GNUNET_free (arm);
 }
 
 
@@ -524,7 +737,8 @@ create_value_message (struct ValueInfo *vi)
   struct GNUNET_SENSOR_ValueMessage *vm;
   struct GNUNET_MQ_Envelope *ev;
 
-  ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING);
+  ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
+                            GNUNET_MESSAGE_TYPE_SENSOR_READING);
   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
                       &vm->sensorname_hash);
   vm->sensorversion_major = htons (vi->sensor->version_major);
@@ -536,22 +750,6 @@ create_value_message (struct ValueInfo *vi)
 }
 
 
-/**
- * Send given anomaly info report by putting it in the given message queue.
- *
- * @param mq Message queue to put the message in
- * @param ai Anomaly info to report
- */
-static void
-send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai)
-{
-  struct GNUNET_MQ_Envelope *ev;
-
-  ev = create_anomaly_report_message (ai);
-  GNUNET_MQ_send (mq, ev);
-}
-
-
 /******************************************************************************/
 /***************************      CORE Handlers     ***************************/
 /******************************************************************************/
@@ -570,47 +768,77 @@ static int
 handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
                        const struct GNUNET_MessageHeader *message)
 {
+  struct GNUNET_SENSOR_crypto_pow_block *report_block;
   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
   struct GNUNET_SENSOR_SensorInfo *sensor;
-  struct AnomalyInfo *ai;
+  struct AnomalyInfo *my_anomaly_info;
   struct CadetPeer *cadetp;
-  int peer_in_list;
-
-  arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
+  int peer_anomalous;
+  int peer_in_anomalous_list;
+
+  /* Verify proof-of-work, signature and extract report message */
+  report_block = (struct GNUNET_SENSOR_crypto_pow_block *) &message[1];
+  if (sizeof (struct GNUNET_SENSOR_AnomalyReportMessage) !=
+      GNUNET_SENSOR_crypto_verify_pow_sign (report_block, pow_matching_bits,
+                                            (struct GNUNET_CRYPTO_EddsaPublicKey
+                                             *) &other->public_key,
+                                            (void **) &arm))
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Received invalid anomaly report from peer `%s'.\n",
+         GNUNET_i2s (other));
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  /* Now we parse the content of the message */
   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
-  if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
-      sensor->version_minor != arm->sensorversion_minor)
+  if (NULL == sensor ||
+      sensor->version_major != ntohs (arm->sensorversion_major) ||
+      sensor->version_minor != ntohs (arm->sensorversion_minor))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "I don't have the sensor reported by the peer `%s'.\n",
          GNUNET_i2s (other));
     return GNUNET_OK;
   }
-  ai = get_anomaly_info_by_sensor (sensor);
-  GNUNET_assert (NULL != ai);
-  peer_in_list =
-      GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
-  if (GNUNET_YES == ai->anomalous)
+  my_anomaly_info = get_anomaly_info_by_sensor (sensor);
+  GNUNET_assert (NULL != my_anomaly_info);
+  peer_in_anomalous_list =
+      GNUNET_CONTAINER_multipeermap_contains (my_anomaly_info->
+                                              anomalous_neighbors, other);
+  peer_anomalous = ntohs (arm->anomalous);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received an anomaly update from neighbour `%s' (%d).\n",
+       GNUNET_i2s (other), peer_anomalous);
+  if (GNUNET_YES == peer_anomalous)
   {
-    if (GNUNET_YES == peer_in_list)
+    if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
       GNUNET_break_op (0);
     else
-      GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
+      GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
+                                         other, NULL,
                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
   else
   {
-    if (GNUNET_NO == peer_in_list)
+    if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
       GNUNET_break_op (0);
     else
-      GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
+      GNUNET_CONTAINER_multipeermap_remove_all (my_anomaly_info->
+                                                anomalous_neighbors, other);
   }
-  /* Send anomaly update to collection point */
-  if (NULL != ai->sensor->collection_point &&
-        GNUNET_YES == ai->sensor->report_anomalies)
+  /* This is important to create an updated block since the data changed */
+  update_anomaly_report_pow_block (my_anomaly_info);
+  /* Send anomaly update to collection point only if I have the same anomaly */
+  if (GNUNET_YES == my_anomaly_info->anomalous &&
+      NULL != sensor->collection_point &&
+      GNUNET_YES == sensor->report_anomalies)
   {
-    cadetp = get_cadet_peer (*ai->sensor->collection_point);
-    send_anomaly_report (cadetp->mq, ai);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
+         GNUNET_i2s (sensor->collection_point));
+    cadetp = get_cadet_peer (*sensor->collection_point);
+    send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
   }
   return GNUNET_OK;
 }
@@ -636,8 +864,7 @@ value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
 
   if (NULL != emsg)
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-        _("PEERSTORE error: %s.\n"), emsg);
+    LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
     return GNUNET_YES;
   }
   if (NULL != vi->last_value)
@@ -647,7 +874,7 @@ value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
   }
   vi->last_value = GNUNET_memdup (record->value, record->value_size);
   vi->last_value_size = record->value_size;
-  vi->last_value_timestamp = GNUNET_TIME_absolute_get();
+  vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
   vi->last_value_reported = GNUNET_NO;
   return GNUNET_YES;
 }
@@ -671,11 +898,13 @@ core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
 
   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
+       GNUNET_i2s (peer));
   neighborhood--;
   corep = corep_head;
   while (NULL != corep)
   {
-    if (peer == corep->peer_id)
+    if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
     {
       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
       destroy_core_peer (corep);
@@ -683,9 +912,6 @@ core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
     }
     corep = corep->next;
   }
-  LOG (GNUNET_ERROR_TYPE_ERROR,
-       _("Received disconnect notification from CORE"
-         " for a peer we didn't know about.\n"));
 }
 
 
@@ -703,6 +929,8 @@ core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
 
   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
+       GNUNET_i2s (peer));
   neighborhood++;
   corep = GNUNET_new (struct CorePeer);
   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
@@ -713,7 +941,12 @@ core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
   while (NULL != ai)
   {
     if (GNUNET_YES == ai->anomalous)
-      send_anomaly_report (corep->mq, ai);
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Updating newly connected neighbor `%s' with anomalous sensor.\n",
+           GNUNET_i2s (peer));
+      send_anomaly_report (corep->mq, ai, GNUNET_YES);
+    }
     ai = ai->next;
   }
 }
@@ -772,7 +1005,10 @@ cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
   struct CadetPeer *cadetp = channel_ctx;
 
   if (GNUNET_YES == cadetp->destroying)
-      return;
+    return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
+       GNUNET_i2s (&cadetp->peer_id));
   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
   cadetp->channel = NULL;
   destroy_cadet_peer (cadetp);
@@ -801,21 +1037,31 @@ SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
 
   if (GNUNET_NO == module_running)
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
   ai = get_anomaly_info_by_sensor (sensor);
   GNUNET_assert (NULL != ai);
   ai->anomalous = anomalous;
+  /* This is important to create an updated block since the data changed */
+  update_anomaly_report_pow_block (ai);
   /* Report change to all neighbors */
   corep = corep_head;
   while (NULL != corep)
   {
-    send_anomaly_report (corep->mq, ai);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Sending an anomaly report to neighbor `%s'.\n",
+         GNUNET_i2s (corep->peer_id));
+    send_anomaly_report (corep->mq, ai, GNUNET_YES);
     corep = corep->next;
   }
+  /* Report change to collection point if need */
   if (NULL != ai->sensor->collection_point &&
       GNUNET_YES == ai->sensor->report_anomalies)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
+         GNUNET_i2s (ai->sensor->collection_point));
     cadetp = get_cadet_peer (*ai->sensor->collection_point);
-    send_anomaly_report (cadetp->mq, ai);
+    send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
   }
 }
 
@@ -842,12 +1088,10 @@ report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   vi->reporting_task =
       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
                                     &report_value, vi);
-  if (0 == vi->last_value_size ||
-      GNUNET_YES == vi->last_value_reported)
+  if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
-         "Did not receive a fresh value from `%s' to report.\n",
-         sensor->name);
+         "Did not receive a fresh value from `%s' to report.\n", sensor->name);
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -883,10 +1127,13 @@ init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
 
   /* Create sensor anomaly info context */
   ai = GNUNET_new (struct AnomalyInfo);
+
   ai->sensor = sensor;
   ai->anomalous = GNUNET_NO;
   ai->anomalous_neighbors =
       GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
+  ai->report_block = NULL;
+  ai->report_creation_cx = NULL;
   GNUNET_CONTAINER_DLL_insert (ai_head, ai_tail, ai);
   /* Create sensor value info context (if needed to be reported) */
   if (NULL == sensor->collection_point || GNUNET_NO == sensor->report_values)
@@ -902,11 +1149,11 @@ init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
   vi->last_value_size = 0;
   vi->last_value_reported = GNUNET_NO;
   vi->wc =
-    GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
-                            &value_watch_cb, vi);
+      GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
+                              &value_watch_cb, vi);
   vi->reporting_task =
-    GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
-                                  &report_value, vi);
+      GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
+                                    &report_value, vi);
   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
   return GNUNET_YES;
 }
@@ -921,10 +1168,12 @@ init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
  */
 int
 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
-                                struct GNUNET_CONTAINER_MultiHashMap *s)
+                        struct GNUNET_CONTAINER_MultiHashMap *s)
 {
   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-    {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
+    {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
+     sizeof (struct GNUNET_MessageHeader) +
+     sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
     {NULL, 0, 0}
   };
@@ -936,6 +1185,23 @@ SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
   GNUNET_assert (NULL != s);
   sensors = s;
   cfg = c;
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (cfg, "sensor-reporting",
+                                             "POW_MATCHING_BITS",
+                                             &pow_matching_bits))
+  {
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "sensor-reporting",
+                               "POW_MATCHING_BITS");
+    SENSOR_reporting_stop ();
+    return GNUNET_SYSERR;
+  }
+  if (pow_matching_bits > sizeof (struct GNUNET_HashCode))
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, "Matching bits value too large (%d > %d).\n",
+         pow_matching_bits, sizeof (struct GNUNET_HashCode));
+    SENSOR_reporting_stop ();
+    return GNUNET_SYSERR;
+  }
   /* Connect to PEERSTORE */
   peerstore = GNUNET_PEERSTORE_connect (cfg);
   if (NULL == peerstore)
@@ -966,6 +1232,13 @@ SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
     SENSOR_reporting_stop ();
     return GNUNET_SYSERR;
   }
+  private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg);
+  if (NULL == private_key)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR, _("Failed to load my private key.\n"));
+    SENSOR_reporting_stop ();
+    return GNUNET_SYSERR;
+  }
   GNUNET_CRYPTO_get_peer_identity (cfg, &mypeerid);
   GNUNET_CONTAINER_multihashmap_iterate (sensors, &init_sensor_reporting, NULL);
   neighborhood = 0;