config fix
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor-reporting.c
index 7b32c7f6cf391b145d05fac641093d1fc0ad6b06..2292649e2ba42675ebed202e670552a0f53bd060 100644 (file)
@@ -23,6 +23,7 @@
  * @brief sensor service reporting functionality
  * @author Omar Tarabai
  */
+#include <inttypes.h>
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "sensor.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
 
+/**
+ * Retry interval (seconds) in case channel to collection point is busy
+ */
+#define COLLECTION_RETRY 1
+
 /**
  * Context of reporting operations
  */
@@ -55,7 +61,7 @@ struct ReportingContext
 
   /**
    * Collection point reporting task
-   * (OR GNUNET_SCHEDULER_NO_TASK)
+   * (or #GNUNET_SCHEDULER_NO_TASK)
    */
   GNUNET_SCHEDULER_TaskIdentifier cp_task;
 
@@ -70,7 +76,7 @@ struct ReportingContext
   void *last_value;
 
   /**
-   * Size of @last_value
+   * Size of @last_value
    */
   size_t last_value_size;
 
@@ -119,16 +125,21 @@ struct CadetChannelContext
   void *pending_msg;
 
   /**
-   * Size of @pending_msg
+   * Size of @pending_msg
    */
   size_t pending_msg_size;
 
   /**
    * Handle to CADET tranmission request in case we are sending
-   * (sending == GNUNET_YES)
+   * (sending == #GNUNET_YES)
    */
   struct GNUNET_CADET_TransmitHandle *th;
 
+  /**
+   * Are we currently destroying the channel and its context?
+   */
+  int destroying;
+
 };
 
 /**
@@ -202,6 +213,7 @@ destroy_reporting_context (struct ReportingContext *rc)
 static void
 destroy_cadet_channel_context (struct CadetChannelContext *cc)
 {
+  cc->destroying = GNUNET_YES;
   if (NULL != cc->th)
   {
     GNUNET_CADET_notify_transmit_ready_cancel (cc->th);
@@ -220,15 +232,18 @@ destroy_cadet_channel_context (struct CadetChannelContext *cc)
   GNUNET_free (cc);
 }
 
+
 /**
  * Stop sensor reporting module
  */
-void SENSOR_reporting_stop ()
+void
+SENSOR_reporting_stop ()
 {
   struct ReportingContext *rc;
   struct CadetChannelContext *cc;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Stopping sensor reporting module.\n");
   while (NULL != cc_head)
   {
     cc = cc_head;
@@ -243,7 +258,7 @@ void SENSOR_reporting_stop ()
   }
   if (NULL != peerstore)
   {
-    GNUNET_PEERSTORE_disconnect (peerstore);
+    GNUNET_PEERSTORE_disconnect (peerstore, GNUNET_YES);
     peerstore = NULL;
   }
   if (NULL != cadet)
@@ -253,6 +268,7 @@ void SENSOR_reporting_stop ()
   }
 }
 
+
 /**
  * Returns CADET channel established to given peer
  * or creates a new one
@@ -280,10 +296,12 @@ get_cadet_channel (struct GNUNET_PeerIdentity pid)
       GNUNET_CADET_OPTION_DEFAULT);
   cc->pid = pid;
   cc->sending = GNUNET_NO;
+  cc->destroying = GNUNET_NO;
   GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
   return cc;
 }
 
+
 /**
  * Construct a reading message ready to be sent over CADET channel
  *
@@ -293,25 +311,25 @@ get_cadet_channel (struct GNUNET_PeerIdentity pid)
  */
 static size_t
 construct_reading_message (struct ReportingContext *rc,
-    struct GNUNET_SENSOR_Reading **msg)
+                           struct GNUNET_SENSOR_ReadingMessage **msg)
 {
-  struct GNUNET_SENSOR_Reading *ret;
-  size_t sensorname_size;
-  size_t total_size;
+  struct GNUNET_SENSOR_ReadingMessage *ret;
+  uint16_t sensorname_size;
+  uint16_t total_size;
   void *dummy;
 
   sensorname_size = strlen (rc->sensor->name) + 1;
-  total_size = sizeof(struct GNUNET_SENSOR_Reading) +
+  total_size = sizeof(struct GNUNET_SENSOR_ReadingMessage) +
       sensorname_size +
       rc->last_value_size;
   ret = GNUNET_malloc (total_size);
-  ret->header->size = htons (total_size);
-  ret->header->type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
-  ret->sensorname_size = GNUNET_htobe64 (sensorname_size);
+  ret->header.size = htons (total_size);
+  ret->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_READING);
+  ret->sensorname_size = htons (sensorname_size);
   ret->sensorversion_major = htons (rc->sensor->version_major);
   ret->sensorversion_minor = htons (rc->sensor->version_minor);
   ret->timestamp = GNUNET_htobe64 (rc->timestamp);
-  ret->value_size = GNUNET_htobe64 (rc->last_value_size);
+  ret->value_size = htons (rc->last_value_size);
   dummy = &ret[1];
   memcpy (dummy, rc->sensor->name, sensorname_size);
   dummy += sensorname_size;
@@ -320,6 +338,7 @@ construct_reading_message (struct ReportingContext *rc,
   return total_size;
 }
 
+
 /**
  * Function called to notify a client about the connection begin ready
  * to queue more data.  @a buf will be NULL and @a size zero if the
@@ -336,12 +355,20 @@ do_report_collection_point (void *cls, size_t size, void *buf)
   struct CadetChannelContext *cc = cls;
   size_t written = 0;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Copying to CADET transmit buffer.\n");
+  cc->th = NULL;
   cc->sending = GNUNET_NO;
-  if (NULL == buf || size != cc->pending_msg_size)
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Copying to CADET transmit buffer.\n");
+  if (NULL == buf)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
-        "CADET failed to transmit message to collection point, discarding.");
+        "CADET failed to transmit message (NULL buf), discarding.\n");
+  }
+  else if (size < cc->pending_msg_size)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+        "CADET failed to transmit message (small size, expected: %u, got: %u)"
+        ", discarding.\n", cc->pending_msg_size, size);
   }
   else
   {
@@ -349,45 +376,54 @@ do_report_collection_point (void *cls, size_t size, void *buf)
     written = cc->pending_msg_size;
   }
   GNUNET_free (cc->pending_msg);
+  cc->pending_msg = NULL;
   cc->pending_msg_size = 0;
   return written;
 }
 
+
 /**
  * Task scheduled to send values to collection point
  *
- * @param cls closure, a 'struct CollectionReportingContext *'
+ * @param cls closure, a `struct CollectionReportingContext *`
  * @param tc unused
  */
-static void report_collection_point
-(void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
+static void
+report_collection_point (void *cls,
+                         const struct GNUNET_SCHEDULER_TaskContext* tc)
 {
   struct ReportingContext *rc = cls;
   struct SensorInfo *sensor = rc->sensor;
   struct CadetChannelContext *cc;
-  struct GNUNET_SENSOR_Reading *msg;
+  struct GNUNET_SENSOR_ReadingMessage *msg;
   size_t msg_size;
 
   rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
   if (0 == rc->last_value_size) /* Did not receive a sensor value yet */
   {
-    LOG (GNUNET_ERROR_TYPE_WARNING, "Did not receive a value from `%s' "
-        "to report yet.\n", rc->sensor->name);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Did not receive a value from `%s' to report yet.\n",
+         rc->sensor->name);
     rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
             &report_collection_point, rc);
     return;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Now trying to report last seen value of `%s' "
-      "to collection point.\n", rc->sensor->name);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Now trying to report last seen value of `%s' "
+       "to collection point.\n",
+       rc->sensor->name);
   GNUNET_assert (NULL != sensor->collection_point);
   cc = get_cadet_channel (*sensor->collection_point);
   if (GNUNET_YES == cc->sending)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-        "Cadet channel to collection point busy, "
-        "trying again for sensor `%s' on next interval.\n", rc->sensor->name);
-    rc->cp_task = GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
-            &report_collection_point, rc);
+         "Cadet channel to collection point busy, "
+         "trying again for sensor `%s' after %d seconds.\n",
+         rc->sensor->name,
+         COLLECTION_RETRY);
+    rc->cp_task = GNUNET_SCHEDULER_add_delayed (
+      GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, COLLECTION_RETRY),
+      &report_collection_point, rc);
     return;
   }
   msg_size = construct_reading_message (rc, &msg);
@@ -404,7 +440,8 @@ static void report_collection_point
       &report_collection_point, rc);
 }
 
-/*
+
+/**
  * Sensor value watch callback
  */
 static int
@@ -414,8 +451,6 @@ sensor_watch_cb (void *cls,
 {
   struct ReportingContext *rc = cls;
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a sensor `%s' watch value, "
-      "updating notification last_value.\n", rc->sensor->name);
   if (NULL != emsg)
     return GNUNET_YES;
   if (NULL != rc->last_value)
@@ -427,9 +462,15 @@ sensor_watch_cb (void *cls,
   memcpy (rc->last_value, record->value, record->value_size);
   rc->last_value_size = record->value_size;
   rc->timestamp = GNUNET_TIME_absolute_get().abs_value_us;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received a sensor `%s' watch value at "
+       "timestamp %" PRIu64 ", updating notification last_value.\n",
+       rc->sensor->name,
+       rc->timestamp);
   return GNUNET_YES;
 }
 
+
 /**
  * Iterator for defined sensors
  * Watches sensors for readings to report
@@ -485,6 +526,7 @@ init_sensor_reporting (void *cls,
   return GNUNET_YES;
 }
 
+
 /**
  * Function called whenever a channel is destroyed.  Should clean up
  * any associated state.
@@ -496,12 +538,15 @@ init_sensor_reporting (void *cls,
  * @param channel_ctx place where local state associated
  *                   with the channel is stored
  */
-static void cadet_channel_destroyed (void *cls,
-    const struct GNUNET_CADET_Channel *channel,
-    void *channel_ctx)
+static void
+cadet_channel_destroyed (void *cls,
+                         const struct GNUNET_CADET_Channel *channel,
+                         void *channel_ctx)
 {
   struct CadetChannelContext *cc = channel_ctx;
 
+  if (GNUNET_YES == cc->destroying)
+    return;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Received a `channel destroyed' notification from CADET, "
       "cleaning up.\n");
@@ -519,13 +564,14 @@ static void cadet_channel_destroyed (void *cls,
  */
 int
 SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
-    struct GNUNET_CONTAINER_MultiHashMap *sensors)
+                        struct GNUNET_CONTAINER_MultiHashMap *sensors)
 {
   static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
       {NULL, 0, 0}
   };
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting sensor reporting module.\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Starting sensor reporting module.\n");
   GNUNET_assert(NULL != sensors);
   cfg = c;
   peerstore = GNUNET_PEERSTORE_connect(cfg);
@@ -545,13 +591,14 @@ SENSOR_reporting_start (const struct GNUNET_CONFIGURATION_Handle *c,
   if (NULL == cadet)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-        _("Failed to connect to CADET service.\n"));
+         _("Failed to connect to CADET service.\n"));
     SENSOR_reporting_stop ();
     return GNUNET_SYSERR;
   }
-  GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
-  GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
-
+  GNUNET_CRYPTO_get_peer_identity (cfg,
+                                   &mypeerid);
+  GNUNET_CONTAINER_multihashmap_iterate(sensors,
+                                        &init_sensor_reporting, NULL);
   return GNUNET_OK;
 }