return cadetp;
cadetp = cadetp->next;
}
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating a CADET connection to peer `%s'.\n",
+ GNUNET_i2s (&pid));
/* Not found, create struct and channel */
cadetp = GNUNET_new (struct CadetPeer);
cadetp->peer_id = pid;
cadetp->channel =
GNUNET_CADET_channel_create (cadet, cadetp, &pid,
GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
- GNUNET_CADET_OPTION_DEFAULT);
+ GNUNET_CADET_OPTION_RELIABLE);
cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
return cadetp;
* MQ envelope.
*
* @param ai Anomaly info struct to use
+ * @param type Message type
* @return Envelope with message
*/
static struct GNUNET_MQ_Envelope *
-create_anomaly_report_message (struct AnomalyInfo *ai)
+create_anomaly_report_message (struct AnomalyInfo *ai, int type)
{
struct GNUNET_SENSOR_AnomalyReportMessage *arm;
struct GNUNET_MQ_Envelope *ev;
- ev = GNUNET_MQ_msg (arm, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT);
+ ev = GNUNET_MQ_msg (arm, type);
GNUNET_CRYPTO_hash (ai->sensor->name, strlen (ai->sensor->name) + 1,
&arm->sensorname_hash);
arm->sensorversion_major = htons (ai->sensor->version_major);
arm->sensorversion_minor = htons (ai->sensor->version_minor);
arm->anomalous = htons (ai->anomalous);
arm->anomalous_neighbors =
- ((float) GNUNET_CONTAINER_multipeermap_size (ai->anomalous_neighbors)) /
- neighborhood;
+ (0 ==
+ neighborhood) ? 0 : ((float)
+ GNUNET_CONTAINER_multipeermap_size
+ (ai->anomalous_neighbors)) / neighborhood;
return ev;
}
struct GNUNET_SENSOR_ValueMessage *vm;
struct GNUNET_MQ_Envelope *ev;
- ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size, GNUNET_MESSAGE_TYPE_SENSOR_READING);
+ ev = GNUNET_MQ_msg_extra (vm, vi->last_value_size,
+ GNUNET_MESSAGE_TYPE_SENSOR_READING);
GNUNET_CRYPTO_hash (vi->sensor->name, strlen (vi->sensor->name) + 1,
&vm->sensorname_hash);
vm->sensorversion_major = htons (vi->sensor->version_major);
*
* @param mq Message queue to put the message in
* @param ai Anomaly info to report
+ * @param p2p Is the report sent to a neighboring peer
*/
static void
-send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai)
+send_anomaly_report (struct GNUNET_MQ_Handle *mq, struct AnomalyInfo *ai,
+ int p2p)
{
struct GNUNET_MQ_Envelope *ev;
+ int type;
- ev = create_anomaly_report_message (ai);
+ type =
+ (GNUNET_YES ==
+ p2p) ? GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P :
+ GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT;
+ ev = create_anomaly_report_message (ai, type);
GNUNET_MQ_send (mq, ev);
}
{
struct GNUNET_SENSOR_AnomalyReportMessage *arm;
struct GNUNET_SENSOR_SensorInfo *sensor;
- struct AnomalyInfo *ai;
+ struct AnomalyInfo *my_anomaly_info;
struct CadetPeer *cadetp;
- int peer_in_list;
+ int peer_anomalous;
+ int peer_in_anomalous_list;
arm = (struct GNUNET_SENSOR_AnomalyReportMessage *) message;
sensor = GNUNET_CONTAINER_multihashmap_get (sensors, &arm->sensorname_hash);
- if (NULL == sensor || sensor->version_major != arm->sensorversion_major ||
- sensor->version_minor != arm->sensorversion_minor)
+ if (NULL == sensor ||
+ sensor->version_major != ntohs (arm->sensorversion_major) ||
+ sensor->version_minor != ntohs (arm->sensorversion_minor))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"I don't have the sensor reported by the peer `%s'.\n",
GNUNET_i2s (other));
return GNUNET_OK;
}
- ai = get_anomaly_info_by_sensor (sensor);
- GNUNET_assert (NULL != ai);
- peer_in_list =
- GNUNET_CONTAINER_multipeermap_contains (ai->anomalous_neighbors, other);
- if (GNUNET_YES == ai->anomalous)
+ my_anomaly_info = get_anomaly_info_by_sensor (sensor);
+ GNUNET_assert (NULL != my_anomaly_info);
+ peer_in_anomalous_list =
+ GNUNET_CONTAINER_multipeermap_contains
+ (my_anomaly_info->anomalous_neighbors, other);
+ peer_anomalous = ntohs (arm->anomalous);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received an anomaly update from neighbour `%s' (%d).\n",
+ GNUNET_i2s (other), peer_anomalous);
+ if (GNUNET_YES == peer_anomalous)
{
- if (GNUNET_YES == peer_in_list)
+ if (GNUNET_YES == peer_in_anomalous_list) /* repeated positive report */
GNUNET_break_op (0);
else
- GNUNET_CONTAINER_multipeermap_put (ai->anomalous_neighbors, other, NULL,
+ GNUNET_CONTAINER_multipeermap_put (my_anomaly_info->anomalous_neighbors,
+ other, NULL,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
else
{
- if (GNUNET_NO == peer_in_list)
+ if (GNUNET_NO == peer_in_anomalous_list) /* repeated negative report */
GNUNET_break_op (0);
else
- GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, other);
+ GNUNET_CONTAINER_multipeermap_remove_all
+ (my_anomaly_info->anomalous_neighbors, other);
}
- /* Send anomaly update to collection point */
- if (NULL != ai->sensor->collection_point &&
- GNUNET_YES == ai->sensor->report_anomalies)
+ /* Send anomaly update to collection point only if I have the same anomaly */
+ if (GNUNET_YES == my_anomaly_info->anomalous &&
+ NULL != sensor->collection_point &&
+ GNUNET_YES == sensor->report_anomalies)
{
- cadetp = get_cadet_peer (*ai->sensor->collection_point);
- send_anomaly_report (cadetp->mq, ai);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Neighbor update triggered sending anomaly report to collection point `%s'.\n",
+ GNUNET_i2s (sensor->collection_point));
+ cadetp = get_cadet_peer (*sensor->collection_point);
+ send_anomaly_report (cadetp->mq, my_anomaly_info, GNUNET_NO);
}
return GNUNET_OK;
}
if (NULL != emsg)
{
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("PEERSTORE error: %s.\n"), emsg);
+ LOG (GNUNET_ERROR_TYPE_ERROR, _("PEERSTORE error: %s.\n"), emsg);
return GNUNET_YES;
}
if (NULL != vi->last_value)
}
vi->last_value = GNUNET_memdup (record->value, record->value_size);
vi->last_value_size = record->value_size;
- vi->last_value_timestamp = GNUNET_TIME_absolute_get();
+ vi->last_value_timestamp = GNUNET_TIME_absolute_get ();
vi->last_value_reported = GNUNET_NO;
return GNUNET_YES;
}
if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
return;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Core peer `%s' disconnected.\n",
+ GNUNET_i2s (peer));
neighborhood--;
corep = corep_head;
while (NULL != corep)
{
- if (peer == corep->peer_id)
+ if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, corep->peer_id))
{
GNUNET_CONTAINER_DLL_remove (corep_head, corep_tail, corep);
destroy_core_peer (corep);
}
corep = corep->next;
}
- LOG (GNUNET_ERROR_TYPE_ERROR,
- _("Received disconnect notification from CORE"
- " for a peer we didn't know about.\n"));
}
if (0 == GNUNET_CRYPTO_cmp_peer_identity (&mypeerid, peer))
return;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core peer `%s'.\n",
+ GNUNET_i2s (peer));
neighborhood++;
corep = GNUNET_new (struct CorePeer);
corep->peer_id = (struct GNUNET_PeerIdentity *) peer;
while (NULL != ai)
{
if (GNUNET_YES == ai->anomalous)
- send_anomaly_report (corep->mq, ai);
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Updating newly connected neighbor `%s' with anomalous sensor.\n",
+ GNUNET_i2s (peer));
+ send_anomaly_report (corep->mq, ai, GNUNET_YES);
+ }
ai = ai->next;
}
}
struct CadetPeer *cadetp = channel_ctx;
if (GNUNET_YES == cadetp->destroying)
- return;
+ return;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
+ GNUNET_i2s (&cadetp->peer_id));
GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
cadetp->channel = NULL;
destroy_cadet_peer (cadetp);
if (GNUNET_NO == module_running)
return;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an external anomaly update.\n");
ai = get_anomaly_info_by_sensor (sensor);
GNUNET_assert (NULL != ai);
ai->anomalous = anomalous;
corep = corep_head;
while (NULL != corep)
{
- send_anomaly_report (corep->mq, ai);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending an anomaly report to neighbor `%s'.\n",
+ GNUNET_i2s (corep->peer_id));
+ send_anomaly_report (corep->mq, ai, GNUNET_YES);
corep = corep->next;
}
+ /* Report change to collection point if need */
if (NULL != ai->sensor->collection_point &&
GNUNET_YES == ai->sensor->report_anomalies)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Local anomaly update triggered sending anomaly report to collection point `%s'.\n",
+ GNUNET_i2s (ai->sensor->collection_point));
cadetp = get_cadet_peer (*ai->sensor->collection_point);
- send_anomaly_report (cadetp->mq, ai);
+ send_anomaly_report (cadetp->mq, ai, GNUNET_NO);
}
}
vi->reporting_task =
GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
&report_value, vi);
- if (0 == vi->last_value_size ||
- GNUNET_YES == vi->last_value_reported)
+ if (0 == vi->last_value_size || GNUNET_YES == vi->last_value_reported)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Did not receive a fresh value from `%s' to report.\n",
- sensor->name);
+ "Did not receive a fresh value from `%s' to report.\n", sensor->name);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
/* Create sensor anomaly info context */
ai = GNUNET_new (struct AnomalyInfo);
+
ai->sensor = sensor;
ai->anomalous = GNUNET_NO;
ai->anomalous_neighbors =
vi->last_value_size = 0;
vi->last_value_reported = GNUNET_NO;
vi->wc =
- GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
- &value_watch_cb, vi);
+ GNUNET_PEERSTORE_watch (peerstore, "sensor", &mypeerid, sensor->name,
+ &value_watch_cb, vi);
vi->reporting_task =
- GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
- &report_value, vi);
+ GNUNET_SCHEDULER_add_delayed (sensor->value_reporting_interval,
+ &report_value, vi);
GNUNET_CONTAINER_DLL_insert (vi_head, vi_tail, vi);
return GNUNET_YES;
}
*/
int
SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
- struct GNUNET_CONTAINER_MultiHashMap *s)
+ struct GNUNET_CONTAINER_MultiHashMap *s)
{
static struct GNUNET_CORE_MessageHandler core_handlers[] = {
- {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT,
+ {&handle_anomaly_report, GNUNET_MESSAGE_TYPE_SENSOR_ANOMALY_REPORT_P2P,
sizeof (struct GNUNET_SENSOR_AnomalyReportMessage)},
{NULL, 0, 0}
};