-keep track of messages passed to mq
[oweals/gnunet.git] / src / sensor / gnunet-service-sensor_reporting.c
index 5e0972c5e964b601f644a747b24e1eb61e812b6a..4ea570ccd4e2a89899da29fb06f2a8b50bbde1b3 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C)
+     Copyright (C)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
 
 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
 
+/**
+ * Retry time when failing to connect to collection point
+ */
+#define CP_RETRY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
+
 
 /**
  * When we are still generating a proof-of-work and we need to send an anomaly
@@ -157,9 +162,9 @@ struct ValueInfo
   struct GNUNET_PEERSTORE_WatchContext *wc;
 
   /**
-   * Collection point reporting task (or #GNUNET_SCHEDULER_NO_TASK)
+   * Collection point reporting task (or NULL)
    */
-  GNUNET_SCHEDULER_TaskIdentifier reporting_task;
+  struct GNUNET_SCHEDULER_Task *reporting_task;
 
 };
 
@@ -224,6 +229,16 @@ struct CadetPeer
    */
   struct GNUNET_MQ_Handle *mq;
 
+  /**
+   * CADET transmit handle
+   */
+  struct GNUNET_CADET_TransmitHandle *th;
+
+  /**
+   * Task used to try reconnection to collection point after failure
+   */
+  struct GNUNET_SCHEDULER_Task * reconnect_task;
+
   /**
    * Are we currently destroying the channel and its context?
    */
@@ -324,6 +339,13 @@ static long long unsigned int pow_matching_bits;
 
 
 
+/**
+ * Try reconnecting to collection point and send last queued message
+ */
+static void
+cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /******************************************************************************/
 /******************************      CLEANUP     ******************************/
 /******************************************************************************/
@@ -378,10 +400,10 @@ destroy_value_info (struct ValueInfo *vi)
     GNUNET_PEERSTORE_watch_cancel (vi->wc);
     vi->wc = NULL;
   }
-  if (GNUNET_SCHEDULER_NO_TASK != vi->reporting_task)
+  if (NULL != vi->reporting_task)
   {
     GNUNET_SCHEDULER_cancel (vi->reporting_task);
-    vi->reporting_task = GNUNET_SCHEDULER_NO_TASK;
+    vi->reporting_task = NULL;
   }
   if (NULL != vi->last_value)
   {
@@ -401,20 +423,33 @@ static void
 destroy_core_peer (struct CorePeer *corep)
 {
   struct AnomalyInfo *ai;
+  struct AnomalyReportingQueueItem *ar_item;
 
-  if (NULL != corep->mq)
-  {
-    GNUNET_MQ_destroy (corep->mq);
-    corep->mq = NULL;
-  }
   ai = ai_head;
   while (NULL != ai)
   {
     GNUNET_assert (NULL != ai->anomalous_neighbors);
     GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors,
                                               corep->peer_id);
+    /* Remove the core peer from any reporting queues */
+    ar_item = ai->reporting_queue_head;
+    while (NULL != ar_item)
+    {
+      if (ar_item->dest_mq == corep->mq)
+      {
+        GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head,
+                                     ai->reporting_queue_tail, ar_item);
+        break;
+      }
+      ar_item = ar_item->next;
+    }
     ai = ai->next;
   }
+  if (NULL != corep->mq)
+  {
+    GNUNET_MQ_destroy (corep->mq);
+    corep->mq = NULL;
+  }
   GNUNET_free (corep);
 }
 
@@ -428,6 +463,11 @@ static void
 destroy_cadet_peer (struct CadetPeer *cadetp)
 {
   cadetp->destroying = GNUNET_YES;
+  if (NULL != cadetp->reconnect_task)
+  {
+    GNUNET_SCHEDULER_cancel (cadetp->reconnect_task);
+    cadetp->reconnect_task = NULL;
+  }
   if (NULL != cadetp->mq)
   {
     GNUNET_MQ_destroy (cadetp->mq);
@@ -535,6 +575,142 @@ get_anomaly_info_by_sensor (struct GNUNET_SENSOR_SensorInfo *sensor)
 }
 
 
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+cp_mq_ntr (void *cls, size_t size, void *buf)
+{
+  struct CadetPeer *cadetp = cls;
+  const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (cadetp->mq);
+  uint16_t msize;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_ntr()\n");
+  cadetp->th = NULL;
+  if (NULL == buf)
+  {
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Sending anomaly report to collection point failed."
+         " Retrying connection in %s.\n",
+         GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
+    cadetp->reconnect_task =
+        GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
+    return 0;
+  }
+  msize = ntohs (msg->size);
+  GNUNET_assert (msize <= size);
+  memcpy (buf, msg, msize);
+  GNUNET_MQ_impl_send_continue (cadetp->mq);
+  return msize;
+}
+
+
+/**
+ * Try reconnecting to collection point and send last queued message
+ */
+static void
+cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct CadetPeer *cadetp = cls;
+  const struct GNUNET_MessageHeader *msg;
+
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Retrying connection to collection point `%s'.\n",
+       GNUNET_i2s (&cadetp->peer_id));
+  cadetp->reconnect_task = NULL;
+  GNUNET_assert (NULL == cadetp->channel);
+  cadetp->channel =
+      GNUNET_CADET_channel_create (cadet, cadetp, &cadetp->peer_id,
+                                   GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
+                                   GNUNET_CADET_OPTION_RELIABLE);
+  msg = GNUNET_MQ_impl_current (cadetp->mq);
+  cadetp->th =
+      GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
+                                          GNUNET_TIME_UNIT_FOREVER_REL,
+                                          ntohs (msg->size), cp_mq_ntr, cadetp);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cp_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+                 const struct GNUNET_MessageHeader *msg, void *impl_state)
+{
+  struct CadetPeer *cadetp = impl_state;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_send_impl()\n");
+  GNUNET_assert (NULL == cadetp->th);
+  if (NULL == cadetp->channel)
+  {
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Sending anomaly report to collection point failed."
+         " Retrying connection in %s.\n",
+         GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
+    cadetp->reconnect_task =
+        GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
+    return;
+  }
+  cadetp->th =
+      GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
+                                          GNUNET_TIME_UNIT_FOREVER_REL,
+                                          ntohs (msg->size), cp_mq_ntr, cadetp);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free 'mq', but should
+ * take care of 'impl_state'.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cp_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct CadetPeer *cp = impl_state;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_destroy_impl()\n");
+  if (NULL != cp->th)
+  {
+    GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
+    cp->th = NULL;
+  }
+}
+
+
+/**
+ * Create the message queue used to send messages to a collection point.
+ * This will be used to make sure that the message are queued even if the
+ * connection to the collection point can not be established at the moment.
+ *
+ * @param cp CadetPeer information struct
+ * @return Message queue handle
+ */
+static struct GNUNET_MQ_Handle *
+cp_mq_create (struct CadetPeer *cp)
+{
+  return GNUNET_MQ_queue_for_callbacks (cp_mq_send_impl, cp_mq_destroy_impl,
+                                        NULL, cp, NULL, NULL, NULL);
+}
+
+
 /**
  * Returns context of a connected CADET peer.
  * Creates it first if didn't exist before.
@@ -563,7 +739,8 @@ get_cadet_peer (struct GNUNET_PeerIdentity pid)
       GNUNET_CADET_channel_create (cadet, cadetp, &pid,
                                    GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
                                    GNUNET_CADET_OPTION_RELIABLE);
-  cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
+  cadetp->mq = cp_mq_create (cadetp);
+  cadetp->reconnect_task = NULL;
   GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
   return cadetp;
 }
@@ -658,7 +835,10 @@ report_creation_cb (void *cls, struct GNUNET_SENSOR_crypto_pow_block *block)
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Anomaly report POW block ready.\n");
-  ai->report_block = block;
+  ai->report_block =
+      GNUNET_memdup (block,
+                     sizeof (struct GNUNET_SENSOR_crypto_pow_block) +
+                     block->msg_size);
   ar_item = ai->reporting_queue_head;
   while (NULL != ar_item)
   {
@@ -717,6 +897,7 @@ update_anomaly_report_pow_block (struct AnomalyInfo *ai)
                                      &timestamp, &mypeerid.public_key,
                                      private_key, pow_matching_bits,
                                      &report_creation_cb, ai);
+  GNUNET_free (arm);
 }
 
 
@@ -851,10 +1032,12 @@ handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other,
  * @param cls Closure, ValueInfo struct related to the sensor we are watching
  * @param record PEERSTORE new record, NULL if error
  * @param emsg Error message, NULL if no error
- * @return GNUNET_YES to continue watching
+ * @return #GNUNET_YES to continue watching
  */
 static int
-value_watch_cb (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg)
+value_watch_cb (void *cls,
+                const struct GNUNET_PEERSTORE_Record *record,
+                const char *emsg)
 {
   struct ValueInfo *vi = cls;
 
@@ -1005,9 +1188,12 @@ cadet_channel_destroyed (void *cls, const struct GNUNET_CADET_Channel *channel,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "CADET channel was destroyed by remote peer `%s' or failed to start.\n",
        GNUNET_i2s (&cadetp->peer_id));
-  GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
+  if (NULL != cadetp->th)
+  {
+    GNUNET_CADET_notify_transmit_ready_cancel (cadetp->th);
+    cadetp->th = NULL;
+  }
   cadetp->channel = NULL;
-  destroy_cadet_peer (cadetp);
 }
 
 
@@ -1094,6 +1280,12 @@ report_value (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
        "Now trying to report last seen value of `%s' to collection point.\n",
        sensor->name);
   cadetp = get_cadet_peer (*sensor->collection_point);
+  if (NULL == cadetp->channel)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Trying to send value to collection point but connection failed, discarding.\n");
+    return;
+  }
   ev = create_value_message (vi);
   GNUNET_MQ_send (cadetp->mq, ev);
   vi->last_value_reported = GNUNET_YES;