From 0255e3026f4553ecfe5098d7f96c05ffca982a52 Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Wed, 2 Jul 2014 12:51:48 +0000 Subject: [PATCH] sensor: completed reporting to collection point --- src/include/gnunet_sensor_service.h | 39 ++++ src/include/gnunet_sensordashboard_service.h | 48 +++++ src/sensor/gnunet-service-sensor-reporting.c | 185 ++++++++++++++---- .../gnunet-service-sensordashboard.c | 1 + src/sensordashboard/sensordashboard.h | 30 +++ 5 files changed, 263 insertions(+), 40 deletions(-) create mode 100644 src/include/gnunet_sensordashboard_service.h create mode 100644 src/sensordashboard/sensordashboard.h diff --git a/src/include/gnunet_sensor_service.h b/src/include/gnunet_sensor_service.h index 19dcbbb75..1bfed345a 100644 --- a/src/include/gnunet_sensor_service.h +++ b/src/include/gnunet_sensor_service.h @@ -223,6 +223,45 @@ struct SensorInfoShort }; +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Used to communicate sensor readings to + * collection points (SENSORDASHBAORD service) + */ +struct GNUNET_SENSOR_Reading +{ + + /** + * Size of the sensor name value, allocated + * at position 0 after this struct + */ + size_t sensorname_size; + + /** + * First part of sensor version number + */ + uint16_t sensorversion_major; + + /** + * Second part of sensor version number + */ + uint16_t sensorversion_minor; + + /** + * Timestamp of recorded reading + */ + uint64_t timestamp; + + /** + * Size of reading value, allocation + * at poistion 1 after this struct + */ + size_t value_size; + +}; +GNUNET_NETWORK_STRUCT_END + /** * Type of an iterator over sensor definitions. * diff --git a/src/include/gnunet_sensordashboard_service.h b/src/include/gnunet_sensordashboard_service.h new file mode 100644 index 000000000..b3d94e4df --- /dev/null +++ b/src/include/gnunet_sensordashboard_service.h @@ -0,0 +1,48 @@ +/* + This file is part of GNUnet + (C) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. + */ + +/** + * @file include/gnunet_sensordashboard_service.h + * @brief API to the sensordashboard service + * @author Omar Tarabai + */ +#ifndef GNUNET_SENSORDASHBOARD_SERVICE_H +#define GNUNET_SENSORDASHBOARD_SERVICE_H + +#include "platform.h" +#include "gnunet_util_lib.h" + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/sensor/gnunet-service-sensor-reporting.c b/src/sensor/gnunet-service-sensor-reporting.c index 952d7b842..6c8a05490 100644 --- a/src/sensor/gnunet-service-sensor-reporting.c +++ b/src/sensor/gnunet-service-sensor-reporting.c @@ -75,11 +75,9 @@ struct ReportingContext size_t last_value_size; /** - * Incremented with every lock request - * (e.g. to send last value). - * Change @last_value only when @value_lock = 0 + * Timestamp of last value reading */ - int value_lock; + uint64_t timestamp; }; @@ -115,6 +113,22 @@ struct CadetChannelContext */ int sending; + /** + * Pointer to a pending message to be sent over the channel + */ + void *pending_msg; + + /** + * Size of @pending_msg + */ + size_t pending_msg_size; + + /** + * Handle to CADET tranmission request in case we are sending + * (sending == GNUNET_YES) + */ + struct GNUNET_CADET_TransmitHandle *th; + }; /** @@ -182,15 +196,45 @@ destroy_reporting_context (struct ReportingContext *rc) GNUNET_free(rc); } +/** + * Destroy a CADET channel context struct + */ +static void +destroy_cadet_channel_context (struct CadetChannelContext *cc) +{ + if (NULL != cc->th) + { + GNUNET_CADET_notify_transmit_ready_cancel (cc->th); + cc->th = NULL; + } + if (NULL != cc->pending_msg) + { + GNUNET_free (cc->pending_msg); + cc->pending_msg = NULL; + } + if (NULL != cc->c) + { + GNUNET_CADET_channel_destroy (cc->c); + cc->c = NULL; + } + GNUNET_free (cc); +} + /** * Stop sensor reporting module */ void SENSOR_reporting_stop () { struct ReportingContext *rc; + struct CadetChannelContext *cc; LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n"); - /* TODO: destroy cadet channels */ + while (NULL != cc_head) + { + cc = cc_head; + GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc); + destroy_cadet_channel_context (cc); + } while (NULL != rc_head) { rc = rc_head; @@ -216,7 +260,7 @@ void SENSOR_reporting_stop () * @param pid Peer Identity * @return Context of established cadet channel */ -struct CadetChannelContext * +static struct CadetChannelContext * get_cadet_channel (struct GNUNET_PeerIdentity pid) { struct CadetChannelContext *cc; @@ -240,6 +284,40 @@ get_cadet_channel (struct GNUNET_PeerIdentity pid) return cc; } +/** + * Construct a reading message ready to be sent over CADET channel + * + * @param rc reporting context to read data from + * @param msg used to return the created message structure + * @return size of created message + */ +static size_t +construct_reading_message (struct ReportingContext *rc, + struct GNUNET_SENSOR_Reading **msg) +{ + struct GNUNET_SENSOR_Reading *ret; + size_t sensorname_size; + size_t total_size; + void *dummy; + + sensorname_size = strlen (rc->sensor->name) + 1; + total_size = sizeof(struct GNUNET_SENSOR_Reading) + + sensorname_size + + rc->last_value_size; + ret = GNUNET_malloc (total_size); + ret->sensorname_size = sensorname_size; + ret->sensorversion_major = rc->sensor->version_major; + ret->sensorversion_minor = rc->sensor->version_minor; + ret->timestamp = rc->timestamp; + ret->value_size = rc->last_value_size; + dummy = &ret[1]; + memcpy (dummy, rc->sensor->name, sensorname_size); + dummy += sensorname_size; + memcpy (dummy, rc->last_value, rc->last_value_size); + *msg = ret; + return total_size; +} + /** * Function called to notify a client about the connection begin ready * to queue more data. @a buf will be NULL and @a size zero if the @@ -250,13 +328,27 @@ get_cadet_channel (struct GNUNET_PeerIdentity pid) * @param buf where the callee should write the message * @return number of bytes written to @a buf */ -size_t +static size_t do_report_collection_point (void *cls, size_t size, void *buf) { - /* TODO: check error from CADET */ - /* TODO: do transfer */ - /* TODO: cc->sending, rc->value_lock */ - return 0; + struct CadetChannelContext *cc = cls; + size_t written = 0; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n"); + cc->sending = GNUNET_NO; + if (NULL == buf || size != cc->pending_msg_size) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "CADET failed to transmit message to collection point, discarding."); + } + else + { + memcpy (buf, cc->pending_msg, cc->pending_msg_size); + written = cc->pending_msg_size; + } + GNUNET_free (cc->pending_msg); + cc->pending_msg_size = 0; + return written; } /** @@ -265,38 +357,49 @@ do_report_collection_point (void *cls, size_t size, void *buf) * @param cls closure, a 'struct CollectionReportingContext *' * @param tc unused */ -void report_collection_point +static void report_collection_point (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc) { struct ReportingContext *rc = cls; struct SensorInfo *sensor = rc->sensor; struct CadetChannelContext *cc; + struct GNUNET_SENSOR_Reading *msg; + size_t msg_size; rc->cp_task = GNUNET_SCHEDULER_NO_TASK; + if (0 == rc->last_value_size) /* Did not receive a sensor value yet */ + { + LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' " + "to report yet.\n", rc->sensor->name); + rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval, + &report_collection_point, rc); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' " + "to collection point.\n", rc->sensor->name); GNUNET_assert (NULL != sensor->collection_point); cc = get_cadet_channel (*sensor->collection_point); if (GNUNET_YES == cc->sending) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Cadet channel to collection point busy, trying again on next interval."); - rc->cp_task = - GNUNET_SCHEDULER_add_delayed (sensor->collection_interval, - &report_collection_point, - rc); + "Cadet channel to collection point busy, " + "trying again for sensor `%s' on next interval.\n", rc->sensor->name); + rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval, + &report_collection_point, rc); return; } + msg_size = construct_reading_message (rc, &msg); cc->sending = GNUNET_YES; - rc->value_lock ++; - /* TODO: construct message */ - /* TODO: if constructed message is added to cc, no need for rc->value_lock */ - GNUNET_CADET_notify_transmit_ready (cc->c, + cc->pending_msg = msg; + cc->pending_msg_size = msg_size; + cc->th = GNUNET_CADET_notify_transmit_ready (cc->c, GNUNET_YES, sensor->collection_interval, - rc->last_value_size, /* FIXME: size of constructed message */ + msg_size, &do_report_collection_point, - rc); - /* TODO */ - /* TODO: reschedule reporting */ + cc); + rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval, + &report_collection_point, rc); } /* @@ -309,16 +412,10 @@ sensor_watch_cb (void *cls, { struct ReportingContext *rc = cls; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value, " + "updating notification last_value.\n", rc->sensor->name); if (NULL != emsg) return GNUNET_YES; - if (rc->value_lock > 0) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Did not update reporting context of sensor `%s'" - " because value is locked for sending.", - rc->sensor->name); - return GNUNET_YES; - } if (NULL != rc->last_value) { GNUNET_free (rc->last_value); @@ -327,6 +424,7 @@ sensor_watch_cb (void *cls, rc->last_value = GNUNET_malloc(record->value_size); memcpy (rc->last_value, record->value, record->value_size); rc->last_value_size = record->value_size; + rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us; return GNUNET_YES; } @@ -354,7 +452,6 @@ init_sensor_reporting (void *cls, rc->sensor = sensor; rc->last_value = NULL; rc->last_value_size = 0; - rc->value_lock = 0; rc->wc = GNUNET_PEERSTORE_watch(peerstore, "sensor", &mypeerid, @@ -364,9 +461,11 @@ init_sensor_reporting (void *cls, if (NULL != sensor->collection_point) { LOG (GNUNET_ERROR_TYPE_INFO, - "Will start reporting sensor `%s' values to collection point `%s' every %s.\n", + "Will start reporting sensor `%s' values to " + "collection point `%s' every %s.\n", sensor->name, GNUNET_i2s_full(sensor->collection_point), - GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval, GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval, + GNUNET_YES)); rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval, &report_collection_point, @@ -377,7 +476,8 @@ init_sensor_reporting (void *cls, LOG (GNUNET_ERROR_TYPE_INFO, "Will start reporting sensor `%s' values to p2p network every %s.\n", sensor->name, - GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval, GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval, + GNUNET_YES)); } GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc); return GNUNET_YES; @@ -400,8 +500,12 @@ static void cadet_channel_destroyed (void *cls, { struct CadetChannelContext *cc = channel_ctx; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received a `channel destroyed' notification from CADET, " + "cleaning up.\n"); GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc); - GNUNET_free (cc); + cc->c = NULL; + destroy_cadet_channel_context (cc); } /** @@ -419,10 +523,9 @@ SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c, {NULL, 0, 0} }; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n"); GNUNET_assert(NULL != sensors); cfg = c; - GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid); - GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL); peerstore = GNUNET_PEERSTORE_connect(cfg); if (NULL == peerstore) { @@ -444,6 +547,8 @@ SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c, SENSOR_reporting_stop (); return GNUNET_SYSERR; } + GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid); + GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL); return GNUNET_OK; } diff --git a/src/sensordashboard/gnunet-service-sensordashboard.c b/src/sensordashboard/gnunet-service-sensordashboard.c index b3fbf5d9d..5f451844e 100644 --- a/src/sensordashboard/gnunet-service-sensordashboard.c +++ b/src/sensordashboard/gnunet-service-sensordashboard.c @@ -26,6 +26,7 @@ #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_applications.h" +#include "sensordashboard.h" #include "gnunet_cadet_service.h" static struct GNUNET_CADET_Handle *cadet; diff --git a/src/sensordashboard/sensordashboard.h b/src/sensordashboard/sensordashboard.h new file mode 100644 index 000000000..c7758e26e --- /dev/null +++ b/src/sensordashboard/sensordashboard.h @@ -0,0 +1,30 @@ +/* + This file is part of GNUnet + (C) 2012-2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. + */ +/** + * @file sensordashboard/sensordashboard.h + * @brief IPC messages and private service declarations + * @author Omar Tarabai + */ + +#include "gnunet_sensordashboard_service.h" + +GNUNET_NETWORK_STRUCT_BEGIN + +GNUNET_NETWORK_STRUCT_END -- 2.25.1