#include "sensor.h"
#include "gnunet_peerstore_service.h"
#include "gnunet_cadet_service.h"
+#include "gnunet_applications.h"
#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
/**
- * Context of reporting to collection
- * point
+ * Context of reporting operations
*/
-struct CollectionReportingContext
+struct ReportingContext
{
+ /**
+ * DLL
+ */
+ struct ReportingContext *prev;
+
+ /**
+ * DLL
+ */
+ struct ReportingContext *next;
+
/**
* Sensor information
*/
struct SensorInfo *sensor;
/**
- * Reporting task (OR GNUNET_SCHEDULER_NO_TASK)
+ * Collection point reporting task
+ * (OR GNUNET_SCHEDULER_NO_TASK)
*/
- GNUNET_SCHEDULER_TaskIdentifier task;
+ GNUNET_SCHEDULER_TaskIdentifier cp_task;
+
+ /**
+ * Watcher of sensor values
+ */
+ struct GNUNET_PEERSTORE_WatchContext *wc;
+
+ /**
+ * Last value read from sensor
+ */
+ void *last_value;
+
+ /**
+ * Size of @last_value
+ */
+ size_t last_value_size;
+
+ /**
+ * Incremented with every lock request
+ * (e.g. to send last value).
+ * Change @last_value only when @value_lock = 0
+ */
+ int value_lock;
+
+};
+
+/**
+ * Context of a created CADET channel
+ */
+struct CadetChannelContext
+{
+
+ /**
+ * DLL
+ */
+ struct CadetChannelContext *prev;
+
+ /**
+ * DLL
+ */
+ struct CadetChannelContext *next;
+
+ /**
+ * Peer Id of
+ */
+ struct GNUNET_PeerIdentity pid;
+
+ /**
+ * CADET channel handle
+ */
+ struct GNUNET_CADET_Channel *c;
+
+ /**
+ * Are we sending data on this channel?
+ * #GNUNET_YES / #GNUNET_NO
+ */
+ int sending;
};
/**
* My peer id
*/
-static struct GNUNET_PeerIdentity peerid;
+static struct GNUNET_PeerIdentity mypeerid;
/**
* Handle to CADET service
*/
static struct GNUNET_CADET_Handle *cadet;
+/**
+ * Head of DLL of all reporting contexts
+ */
+struct ReportingContext *rc_head;
+
+/**
+ * Tail of DLL of all reporting contexts
+ */
+struct ReportingContext *rc_tail;
+
+/**
+ * Head of DLL of all cadet channels
+ */
+struct CadetChannelContext *cc_head;
+
+/**
+ * Tail of DLL of all cadet channels
+ */
+struct CadetChannelContext *cc_tail;
+
+
+/**
+ * Destroy a reporting context structure
+ */
+static void
+destroy_reporting_context (struct ReportingContext *rc)
+{
+ if (NULL != rc->wc)
+ {
+ GNUNET_PEERSTORE_watch_cancel (rc->wc);
+ rc->wc = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
+ {
+ GNUNET_SCHEDULER_cancel(rc->cp_task);
+ rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != rc->last_value)
+ {
+ GNUNET_free (rc->last_value);
+ rc->last_value_size = 0;
+ }
+ GNUNET_free(rc);
+}
/**
* Stop sensor reporting module
*/
void SENSOR_reporting_stop ()
{
+ struct ReportingContext *rc;
+
LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
+ /* TODO: destroy cadet channels */
+ while (NULL != rc_head)
+ {
+ rc = rc_head;
+ GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
+ destroy_reporting_context (rc);
+ }
if (NULL != peerstore)
{
GNUNET_PEERSTORE_disconnect (peerstore);
peerstore = NULL;
}
+ if (NULL != cadet)
+ {
+ GNUNET_CADET_disconnect (cadet);
+ cadet = NULL;
+ }
+}
+
+/**
+ * Returns CADET channel established to given peer
+ * or creates a new one
+ *
+ * @param pid Peer Identity
+ * @return Context of established cadet channel
+ */
+struct CadetChannelContext *
+get_cadet_channel (struct GNUNET_PeerIdentity pid)
+{
+ struct CadetChannelContext *cc;
+
+ cc = cc_head;
+ while (NULL != cc)
+ {
+ if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
+ return cc;
+ cc = cc->next;
+ }
+ cc = GNUNET_new (struct CadetChannelContext);
+ cc->c = GNUNET_CADET_channel_create(cadet,
+ cc,
+ &pid,
+ GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
+ GNUNET_CADET_OPTION_DEFAULT);
+ cc->pid = pid;
+ cc->sending = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
+ return cc;
+}
+
+/**
+ * Function called to notify a client about the connection begin ready
+ * to queue more data. @a buf will be NULL and @a size zero if the
+ * connection was closed for writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in @a buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to @a buf
+ */
+size_t
+do_report_collection_point (void *cls, size_t size, void *buf)
+{
+ /* TODO: check error from CADET */
+ /* TODO: do transfer */
+ /* TODO: cc->sending, rc->value_lock */
+ return 0;
}
/**
void report_collection_point
(void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
{
- struct CollectionReportingContext *crc = cls;
+ struct ReportingContext *rc = cls;
+ struct SensorInfo *sensor = rc->sensor;
+ struct CadetChannelContext *cc;
+
+ rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (NULL != sensor->collection_point);
+ cc = get_cadet_channel (*sensor->collection_point);
+ if (GNUNET_YES == cc->sending)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Cadet channel to collection point busy, trying again on next interval.");
+ rc->cp_task =
+ GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
+ &report_collection_point,
+ rc);
+ return;
+ }
+ cc->sending = GNUNET_YES;
+ rc->value_lock ++;
+ /* TODO: construct message */
+ /* TODO: if constructed message is added to cc, no need for rc->value_lock */
+ GNUNET_CADET_notify_transmit_ready (cc->c,
+ GNUNET_YES,
+ sensor->collection_interval,
+ rc->last_value_size, /* FIXME: size of constructed message */
+ &do_report_collection_point,
+ rc);
+ /* TODO */
+ /* TODO: reschedule reporting */
+}
+
+/*
+ * Sensor value watch callback
+ */
+static int
+sensor_watch_cb (void *cls,
+ struct GNUNET_PEERSTORE_Record *record,
+ char *emsg)
+{
+ struct ReportingContext *rc = cls;
- crc->task = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL != emsg)
+ return GNUNET_YES;
+ if (rc->value_lock > 0)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Did not update reporting context of sensor `%s'"
+ " because value is locked for sending.",
+ rc->sensor->name);
+ return GNUNET_YES;
+ }
+ if (NULL != rc->last_value)
+ {
+ GNUNET_free (rc->last_value);
+ rc->last_value_size = 0;
+ }
+ rc->last_value = GNUNET_malloc(record->value_size);
+ memcpy (rc->last_value, record->value, record->value_size);
+ rc->last_value_size = record->value_size;
+ return GNUNET_YES;
}
/**
void *value)
{
struct SensorInfo *sensor = value;
- struct CollectionReportingContext *crc;
+ struct ReportingContext *rc;
+ if (NULL == sensor->collection_point &&
+ GNUNET_NO == sensor->p2p_report)
+ return GNUNET_YES;
+ rc = GNUNET_new (struct ReportingContext);
+ rc->sensor = sensor;
+ rc->last_value = NULL;
+ rc->last_value_size = 0;
+ rc->value_lock = 0;
+ rc->wc = GNUNET_PEERSTORE_watch(peerstore,
+ "sensor",
+ &mypeerid,
+ sensor->name,
+ &sensor_watch_cb,
+ rc);
if (NULL != sensor->collection_point)
{
LOG (GNUNET_ERROR_TYPE_INFO,
"Will start reporting sensor `%s' values to collection point `%s' every %s.\n",
sensor->name, GNUNET_i2s_full(sensor->collection_point),
GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval, GNUNET_YES));
- crc = GNUNET_new (struct CollectionReportingContext);
- crc->sensor = sensor;
- crc->task =
+ rc->cp_task =
GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
&report_collection_point,
- crc);
+ rc);
}
if (GNUNET_YES == sensor->p2p_report)
{
sensor->name,
GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval, GNUNET_YES));
}
+ GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
return GNUNET_YES;
}
const struct GNUNET_CADET_Channel *channel,
void *channel_ctx)
{
+ struct CadetChannelContext *cc = channel_ctx;
+ GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
+ GNUNET_free (cc);
}
/**
GNUNET_assert(NULL != sensors);
cfg = c;
- GNUNET_CRYPTO_get_peer_identity(cfg, &peerid);
+ GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
peerstore = GNUNET_PEERSTORE_connect(cfg);
if (NULL == peerstore)