sensor: fixes for proof-of-work, test passes now
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
index 15edd149a72a303d6293d86729ff4843510362d1..562768ff1effc15d97b4509accfb9acced3f02be 100644 (file)
@@ -472,13 +472,15 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid)
       return cadetp;
     cadetp = cadetp->next;
   }
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
+       GNUNET_i2s (&pid));
   /* Not found, create struct and channel */
   cadetp = GNUNET_new (struct CadetPeer);
   cadetp->peer_id = pid;
   cadetp->channel =
       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
-                                   GNUNET_CADET_OPTION_DEFAULT);
+                                   GNUNET_CADET_OPTION_RELIABLE);
   cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
   return cadetp;
@@ -490,23 +492,26 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid)
  * MQ envelope.
  *
  * @param ai Anomaly info struct to use
+ * @param type Message type
  * @return Envelope with message
  */
 static struct GNUNET_MQ_Envelope *
-create_anomaly_report_message (struct AnomalyInfo *ai)
+create_anomaly_report_message (struct AnomalyInfo *ai, int type)
 {
   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
   struct GNUNET_MQ_Envelope *ev;
 
-  ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
+  ev = GNUNET_MQ_msg (arm, type);
   GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
                       &arm->sensorname_hash);
   arm->sensorversion_major = htons (ai->sensor->version_major);
   arm->sensorversion_minor = htons (ai->sensor->version_minor);
   arm->anomalous = htons (ai->anomalous);
   arm->anomalous_neighbors =
-      ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
-      neighborhood;
+      (0 ==
+       neighborhood) ? 0 : ((float)
+                            GNUNET_CONTAINER_multipeermap_size
+                            (ai->anomalous_neighbors)) / neighborhood;
   return ev;
 }
 
@@ -524,7 +529,8 @@ create_value_message (struct ValueInfo *vi)
   struct GNUNET_SENSOR_ValueMessage *vm;
   struct GNUNET_MQ_Envelope *ev;
 
-  ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING);
+  ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
+                            GNUNET_MESSAGE_TYPE_SENSOR_READING);
   GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
                       &vm->sensorname_hash);
   vm->sensorversion_major = htons (vi->sensor->version_major);
@@ -541,13 +547,20 @@ create_value_message (struct ValueInfo *vi)
  *
  * @param mq Message queue to put the message in
  * @param ai Anomaly info to report
+ * @param p2p Is the report sent to a neighboring peer
  */
 static void
-send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai)
+send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
+                     int p2p)
 {
   struct GNUNET_MQ_Envelope *ev;
+  int type;
 
-  ev = create_anomaly_report_message (ai);
+  type =
+      (GNUNET_YES ==
+       p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
+      GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
+  ev = create_anomaly_report_message (ai, type);
   GNUNET_MQ_send (mq, ev);
 }
 
@@ -572,45 +585,58 @@ handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
 {
   struct GNUNET_SENSOR_AnomalyReportMessage *arm;
   struct GNUNET_SENSOR_SensorInfo *sensor;
-  struct AnomalyInfo *ai;
+  struct AnomalyInfo *my_anomaly_info;
   struct CadetPeer *cadetp;
-  int peer_in_list;
+  int peer_anomalous;
+  int peer_in_anomalous_list;
 
   arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
   sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
-  if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
-      sensor->version_minor != arm->sensorversion_minor)
+  if (NULL == sensor ||
+      sensor->version_major != ntohs (arm->sensorversion_major) ||
+      sensor->version_minor != ntohs (arm->sensorversion_minor))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "I don't have the sensor reported by the peer `%s'.\n",
          GNUNET_i2s (other));
     return GNUNET_OK;
   }
-  ai = get_anomaly_info_by_sensor (sensor);
-  GNUNET_assert (NULL != ai);
-  peer_in_list =
-      GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
-  if (GNUNET_YES == ai->anomalous)
+  my_anomaly_info = get_anomaly_info_by_sensor (sensor);
+  GNUNET_assert (NULL != my_anomaly_info);
+  peer_in_anomalous_list =
+      GNUNET_CONTAINER_multipeermap_contains
+      (my_anomaly_info->anomalous_neighbors, other);
+  peer_anomalous = ntohs (arm->anomalous);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received an anomaly update from neighbour `%s' (%d).\n",
+       GNUNET_i2s (other), peer_anomalous);
+  if (GNUNET_YES == peer_anomalous)
   {
-    if (GNUNET_YES == peer_in_list)
+    if (GNUNET_YES == peer_in_anomalous_list)   /* repeated positive report */
       GNUNET_break_op (0);
     else
-      GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
+      GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
+                                         other, NULL,
                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
   else
   {
-    if (GNUNET_NO == peer_in_list)
+    if (GNUNET_NO == peer_in_anomalous_list)    /* repeated negative report */
       GNUNET_break_op (0);
     else
-      GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
+      GNUNET_CONTAINER_multipeermap_remove_all
+          (my_anomaly_info->anomalous_neighbors, other);
   }
-  /* Send anomaly update to collection point */
-  if (NULL != ai->sensor->collection_point &&
-        GNUNET_YES == ai->sensor->report_anomalies)
+  /* Send anomaly update to collection point only if I have the same anomaly */
+  if (GNUNET_YES == my_anomaly_info->anomalous &&
+      NULL != sensor->collection_point &&
+      GNUNET_YES == sensor->report_anomalies)
   {
-    cadetp = get_cadet_peer (*ai->sensor->collection_point);
-    send_anomaly_report (cadetp->mq, ai);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
+         GNUNET_i2s (sensor->collection_point));
+    cadetp = get_cadet_peer (*sensor->collection_point);
+    send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
   }
   return GNUNET_OK;
 }
@@ -636,8 +662,7 @@ value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
 
   if (NULL != emsg)
   {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-        _("PEERSTORE error: %s.\n"), emsg);
+    LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
     return GNUNET_YES;
   }
   if (NULL != vi->last_value)
@@ -647,7 +672,7 @@ value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
   }
   vi->last_value = GNUNET_memdup (record->value, record->value_size);
   vi->last_value_size = record->value_size;
-  vi->last_value_timestamp = GNUNET_TIME_absolute_get();
+  vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
   vi->last_value_reported = GNUNET_NO;
   return GNUNET_YES;
 }
@@ -671,11 +696,13 @@ core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
 
   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
+       GNUNET_i2s (peer));
   neighborhood--;
   corep = corep_head;
   while (NULL != corep)
   {
-    if (peer == corep->peer_id)
+    if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
     {
       GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
       destroy_core_peer (corep);
@@ -683,9 +710,6 @@ core_disconnect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
     }
     corep = corep->next;
   }
-  LOG (GNUNET_ERROR_TYPE_ERROR,
-       _("Received disconnect notification from CORE"
-         " for a peer we didn't know about.\n"));
 }
 
 
@@ -703,6 +727,8 @@ core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
 
   if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
+       GNUNET_i2s (peer));
   neighborhood++;
   corep = GNUNET_new (struct CorePeer);
   corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
@@ -713,7 +739,12 @@ core_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
   while (NULL != ai)
   {
     if (GNUNET_YES == ai->anomalous)
-      send_anomaly_report (corep->mq, ai);
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Updating newly connected neighbor `%s' with anomalous sensor.\n",
+           GNUNET_i2s (peer));
+      send_anomaly_report (corep->mq, ai, GNUNET_YES);
+    }
     ai = ai->next;
   }
 }
@@ -772,7 +803,10 @@ cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
   struct CadetPeer *cadetp = channel_ctx;
 
   if (GNUNET_YES == cadetp->destroying)
-      return;
+    return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
+       GNUNET_i2s (&cadetp->peer_id));
   GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
   cadetp->channel = NULL;
   destroy_cadet_peer (cadetp);
@@ -801,6 +835,7 @@ SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
 
   if (GNUNET_NO == module_running)
     return;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
   ai = get_anomaly_info_by_sensor (sensor);
   GNUNET_assert (NULL != ai);
   ai->anomalous = anomalous;
@@ -808,14 +843,21 @@ SENSOR_reporting_anomaly_update (struct GNUNET_SENSOR_SensorInfo *sensor,
   corep = corep_head;
   while (NULL != corep)
   {
-    send_anomaly_report (corep->mq, ai);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Sending an anomaly report to neighbor `%s'.\n",
+         GNUNET_i2s (corep->peer_id));
+    send_anomaly_report (corep->mq, ai, GNUNET_YES);
     corep = corep->next;
   }
+  /* Report change to collection point if need */
   if (NULL != ai->sensor->collection_point &&
       GNUNET_YES == ai->sensor->report_anomalies)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
+         GNUNET_i2s (ai->sensor->collection_point));
     cadetp = get_cadet_peer (*ai->sensor->collection_point);
-    send_anomaly_report (cadetp->mq, ai);
+    send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
   }
 }
 
@@ -842,12 +884,10 @@ report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   vi->reporting_task =
       GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
                                     &report_value, vi);
-  if (0 == vi->last_value_size ||
-      GNUNET_YES == vi->last_value_reported)
+  if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
-         "Did not receive a fresh value from `%s' to report.\n",
-         sensor->name);
+         "Did not receive a fresh value from `%s' to report.\n", sensor->name);
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -883,6 +923,7 @@ init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
 
   /* Create sensor anomaly info context */
   ai = GNUNET_new (struct AnomalyInfo);
+
   ai->sensor = sensor;
   ai->anomalous = GNUNET_NO;
   ai->anomalous_neighbors =
@@ -902,11 +943,11 @@ init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
   vi->last_value_size = 0;
   vi->last_value_reported = GNUNET_NO;
   vi->wc =
-    GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
-                            &value_watch_cb, vi);
+      GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
+                              &value_watch_cb, vi);
   vi->reporting_task =
-    GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
-                                  &report_value, vi);
+      GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
+                                    &report_value, vi);
   GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
   return GNUNET_YES;
 }
@@ -921,10 +962,10 @@ init_sensor_reporting (void *cls, const struct GNUNET_HashCode *key,
  */
 int
 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
-                                struct GNUNET_CONTAINER_MultiHashMap *s)
+                        struct GNUNET_CONTAINER_MultiHashMap *s)
 {
   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-    {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
+    {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
      sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
     {NULL, 0, 0}
   };