size_t last_value_size;
/**
- * Incremented with every lock request
- * (e.g. to send last value).
- * Change @last_value only when @value_lock = 0
+ * Timestamp of last value reading
*/
- int value_lock;
+ uint64_t timestamp;
};
*/
int sending;
+ /**
+ * Pointer to a pending message to be sent over the channel
+ */
+ void *pending_msg;
+
+ /**
+ * Size of @pending_msg
+ */
+ size_t pending_msg_size;
+
+ /**
+ * Handle to CADET tranmission request in case we are sending
+ * (sending == GNUNET_YES)
+ */
+ struct GNUNET_CADET_TransmitHandle *th;
+
};
/**
GNUNET_free(rc);
}
+/**
+ * Destroy a CADET channel context struct
+ */
+static void
+destroy_cadet_channel_context (struct CadetChannelContext *cc)
+{
+ if (NULL != cc->th)
+ {
+ GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
+ cc->th = NULL;
+ }
+ if (NULL != cc->pending_msg)
+ {
+ GNUNET_free (cc->pending_msg);
+ cc->pending_msg = NULL;
+ }
+ if (NULL != cc->c)
+ {
+ GNUNET_CADET_channel_destroy (cc->c);
+ cc->c = NULL;
+ }
+ GNUNET_free (cc);
+}
+
/**
* Stop sensor reporting module
*/
void SENSOR_reporting_stop ()
{
struct ReportingContext *rc;
+ struct CadetChannelContext *cc;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
- /* TODO: destroy cadet channels */
+ while (NULL != cc_head)
+ {
+ cc = cc_head;
+ GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
+ destroy_cadet_channel_context (cc);
+ }
while (NULL != rc_head)
{
rc = rc_head;
* @param pid Peer Identity
* @return Context of established cadet channel
*/
-struct CadetChannelContext *
+static struct CadetChannelContext *
get_cadet_channel (struct GNUNET_PeerIdentity pid)
{
struct CadetChannelContext *cc;
return cc;
}
+/**
+ * Construct a reading message ready to be sent over CADET channel
+ *
+ * @param rc reporting context to read data from
+ * @param msg used to return the created message structure
+ * @return size of created message
+ */
+static size_t
+construct_reading_message (struct ReportingContext *rc,
+ struct GNUNET_SENSOR_Reading **msg)
+{
+ struct GNUNET_SENSOR_Reading *ret;
+ size_t sensorname_size;
+ size_t total_size;
+ void *dummy;
+
+ sensorname_size = strlen (rc->sensor->name) + 1;
+ total_size = sizeof(struct GNUNET_SENSOR_Reading) +
+ sensorname_size +
+ rc->last_value_size;
+ ret = GNUNET_malloc (total_size);
+ ret->header->size = htons (total_size);
+ ret->header->type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
+ ret->sensorname_size = GNUNET_htobe64 (sensorname_size);
+ ret->sensorversion_major = htons (rc->sensor->version_major);
+ ret->sensorversion_minor = htons (rc->sensor->version_minor);
+ ret->timestamp = GNUNET_htobe64 (rc->timestamp);
+ ret->value_size = GNUNET_htobe64 (rc->last_value_size);
+ dummy = &ret[1];
+ memcpy (dummy, rc->sensor->name, sensorname_size);
+ dummy += sensorname_size;
+ memcpy (dummy, rc->last_value, rc->last_value_size);
+ *msg = ret;
+ return total_size;
+}
+
/**
* Function called to notify a client about the connection begin ready
* to queue more data. @a buf will be NULL and @a size zero if the
* @param buf where the callee should write the message
* @return number of bytes written to @a buf
*/
-size_t
+static size_t
do_report_collection_point (void *cls, size_t size, void *buf)
{
- /* TODO: check error from CADET */
- /* TODO: do transfer */
- /* TODO: cc->sending, rc->value_lock */
- return 0;
+ struct CadetChannelContext *cc = cls;
+ size_t written = 0;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
+ cc->sending = GNUNET_NO;
+ if (NULL == buf || size != cc->pending_msg_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "CADET failed to transmit message to collection point, discarding.");
+ }
+ else
+ {
+ memcpy (buf, cc->pending_msg, cc->pending_msg_size);
+ written = cc->pending_msg_size;
+ }
+ GNUNET_free (cc->pending_msg);
+ cc->pending_msg_size = 0;
+ return written;
}
/**
* @param cls closure, a 'struct CollectionReportingContext *'
* @param tc unused
*/
-void report_collection_point
+static void report_collection_point
(void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
{
struct ReportingContext *rc = cls;
struct SensorInfo *sensor = rc->sensor;
struct CadetChannelContext *cc;
+ struct GNUNET_SENSOR_Reading *msg;
+ size_t msg_size;
rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' "
+ "to report yet.\n", rc->sensor->name);
+ rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
+ &report_collection_point, rc);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' "
+ "to collection point.\n", rc->sensor->name);
GNUNET_assert (NULL != sensor->collection_point);
cc = get_cadet_channel (*sensor->collection_point);
if (GNUNET_YES == cc->sending)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Cadet channel to collection point busy, trying again on next interval.");
- rc->cp_task =
- GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
- &report_collection_point,
- rc);
+ "Cadet channel to collection point busy, "
+ "trying again for sensor `%s' on next interval.\n", rc->sensor->name);
+ rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
+ &report_collection_point, rc);
return;
}
+ msg_size = construct_reading_message (rc, &msg);
cc->sending = GNUNET_YES;
- rc->value_lock ++;
- /* TODO: construct message */
- /* TODO: if constructed message is added to cc, no need for rc->value_lock */
- GNUNET_CADET_notify_transmit_ready (cc->c,
+ cc->pending_msg = msg;
+ cc->pending_msg_size = msg_size;
+ cc->th = GNUNET_CADET_notify_transmit_ready (cc->c,
GNUNET_YES,
sensor->collection_interval,
- rc->last_value_size, /* FIXME: size of constructed message */
+ msg_size,
&do_report_collection_point,
- rc);
- /* TODO */
- /* TODO: reschedule reporting */
+ cc);
+ rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
+ &report_collection_point, rc);
}
/*
{
struct ReportingContext *rc = cls;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value, "
+ "updating notification last_value.\n", rc->sensor->name);
if (NULL != emsg)
return GNUNET_YES;
- if (rc->value_lock > 0)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Did not update reporting context of sensor `%s'"
- " because value is locked for sending.",
- rc->sensor->name);
- return GNUNET_YES;
- }
if (NULL != rc->last_value)
{
GNUNET_free (rc->last_value);
rc->last_value = GNUNET_malloc(record->value_size);
memcpy (rc->last_value, record->value, record->value_size);
rc->last_value_size = record->value_size;
+ rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
return GNUNET_YES;
}
rc->sensor = sensor;
rc->last_value = NULL;
rc->last_value_size = 0;
- rc->value_lock = 0;
rc->wc = GNUNET_PEERSTORE_watch(peerstore,
"sensor",
&mypeerid,
if (NULL != sensor->collection_point)
{
LOG (GNUNET_ERROR_TYPE_INFO,
- "Will start reporting sensor `%s' values to collection point `%s' every %s.\n",
+ "Will start reporting sensor `%s' values to "
+ "collection point `%s' every %s.\n",
sensor->name, GNUNET_i2s_full(sensor->collection_point),
- GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval, GNUNET_YES));
+ GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval,
+ GNUNET_YES));
rc->cp_task =
GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
&report_collection_point,
LOG (GNUNET_ERROR_TYPE_INFO,
"Will start reporting sensor `%s' values to p2p network every %s.\n",
sensor->name,
- GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval, GNUNET_YES));
+ GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval,
+ GNUNET_YES));
}
GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
return GNUNET_YES;
{
struct CadetChannelContext *cc = channel_ctx;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received a `channel destroyed' notification from CADET, "
+ "cleaning up.\n");
GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
- GNUNET_free (cc);
+ cc->c = NULL;
+ destroy_cadet_channel_context (cc);
}
/**
{NULL, 0, 0}
};
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
GNUNET_assert(NULL != sensors);
cfg = c;
- GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
- GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
peerstore = GNUNET_PEERSTORE_connect(cfg);
if (NULL == peerstore)
{
SENSOR_reporting_stop ();
return GNUNET_SYSERR;
}
+ GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
+ GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
return GNUNET_OK;
}