adapting testbed-logger to MQ API
authorChristian Grothoff <christian@grothoff.org>
Fri, 24 Jun 2016 14:34:07 +0000 (14:34 +0000)
committerChristian Grothoff <christian@grothoff.org>
Fri, 24 Jun 2016 14:34:07 +0000 (14:34 +0000)
src/include/gnunet_testbed_logger_service.h
src/nse/gnunet-service-nse.c
src/testbed-logger/test_testbed_logger_api.c
src/testbed-logger/testbed_logger_api.c

index c10c2ee6b99d75321505bca9c7c89e5a5bc1b819..4d7c3f7cb6ac516f83eb200660a2e785a8f3a6f1 100644 (file)
@@ -40,7 +40,7 @@ extern "C"
 #endif
 #endif
 
-#include "gnunet_configuration_lib.h"
+#include "gnunet_util_lib.h"
 
 /**
  * Opaque handle for the logging service
@@ -70,7 +70,7 @@ GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h);
 
 /**
  * Functions of this type are called to notify a successful transmission of the
- * message to the logger service
+ * message to the logger service.
  *
  * @param cls the closure given to GNUNET_TESTBED_LOGGER_send()
  * @param size the amount of data sent
@@ -87,7 +87,7 @@ typedef void
  *
  * @param h the logger handle
  * @param data the data to send;
- * @param size how many bytes of data to send
+ * @param size how many bytes of @a data to send
  */
 void
 GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
@@ -99,13 +99,11 @@ GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
  * Flush the buffered data to the logger service
  *
  * @param h the logger handle
- * @param timeout how long to wait before calling the flust completion callback
  * @param cb the callback to call after the data is flushed
- * @param cb_cls the closure for the above callback
+ * @param cb_cls the closure for @a cb
  */
 void
 GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
-                             struct GNUNET_TIME_Relative timeout,
                              GNUNET_TESTBED_LOGGER_FlushCompletion cb,
                              void *cb_cls);
 
index 4d920465c139d24c6238f399b09f855a5cc27357..3db33020469879f73069729c33617f5d633ccec9 100644 (file)
@@ -1386,9 +1386,9 @@ shutdown_task (void *cls)
   }
   if (NULL != lh)
   {
-    struct GNUNET_TIME_Relative timeout;
-    timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30);
-    GNUNET_TESTBED_LOGGER_flush (lh, timeout, &flush_comp_cb, NULL);
+    GNUNET_TESTBED_LOGGER_flush (lh,
+                                 &flush_comp_cb,
+                                 NULL);
   }
   if (NULL != histogram)
   {
index 8f7391f22c562d5e9f55b51bc2918203175f0ad3..0ebe0c3f42dbde102118a8c7a5a4686069b9d80d 100644 (file)
@@ -209,7 +209,6 @@ do_write (void *cls)
   if (0 == i++)
     return;
   GNUNET_TESTBED_LOGGER_flush (h,
-                              GNUNET_TIME_UNIT_FOREVER_REL,
                                &flush_comp,
                                &write_task);
 }
index aaf18cd3358eb1ef194f33abe92251b93c7c5fc7..869054cf3bacabfbf67f0593841bce4cd755acb7 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2008--2013 GNUnet e.V.
+      Copyright (C) 2008--2013, 2016 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -22,6 +22,7 @@
  * @file testbed-logger/testbed_logger_api.c
  * @brief Client-side routines for communicating with the tesbted logger service
  * @author Sree Harsha Totakura <sreeharsha@totakura.in>
+ * @author Christian Grothoff
  */
 
 #include "platform.h"
 #define LOG(kind, ...)                          \
   GNUNET_log_from (kind, "testbed-logger-api", __VA_ARGS__)
 
-/**
- * Debug logging
- */
-#define LOG_DEBUG(...)                          \
-  LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
-
-#ifdef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
-#undef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
-#endif
-
-/**
- * Threshold after which exponential backoff should not increase (15 s).
- */
-#define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
 
 /**
  * The size of the buffer we fill before sending out the message
  */
-#define BUFFER_SIZE GNUNET_SERVER_MAX_MESSAGE_SIZE
-
-/**
- * The message queue for sending messages to the controller service
- */
-struct MessageQueue
-{
-  /**
-   * next pointer for DLL
-   */
-  struct MessageQueue *next;
-
-  /**
-   * prev pointer for DLL
-   */
-  struct MessageQueue *prev;
-
-  /**
-   * The message to be sent
-   */
-  struct GNUNET_MessageHeader *msg;
-
-  /**
-   * Completion callback
-   */
-  GNUNET_TESTBED_LOGGER_FlushCompletion cb;
-
-  /**
-   * callback closure
-   */
-  void *cb_cls;
-};
-
+#define BUFFER_SIZE (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_MessageHeader))
 
 /**
  * Connection handle for the logger service
@@ -94,22 +49,7 @@ struct GNUNET_TESTBED_LOGGER_Handle
   /**
    * Client connection
    */
-  struct GNUNET_CLIENT_Connection *client;
-
-  /**
-   * The transport handle
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
-  /**
-   * DLL head for the message queue
-   */
-  struct MessageQueue *mq_head;
-
-  /**
-   * DLL tail for the message queue
-   */
-  struct MessageQueue *mq_tail;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Flush completion callback
@@ -117,19 +57,19 @@ struct GNUNET_TESTBED_LOGGER_Handle
   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
 
   /**
-   * Closure for the above callback
+   * Closure for @e cb
    */
   void *cb_cls;
 
   /**
    * Local buffer for data to be transmitted
    */
-  void *buf;
+  char buf[BUFFER_SIZE];
 
   /**
-   * The size of the local buffer
+   * How many bytes in @a buf are in use?
    */
-  size_t bs;
+  size_t buse;
 
   /**
    * Number of bytes wrote since last flush
@@ -144,28 +84,15 @@ struct GNUNET_TESTBED_LOGGER_Handle
   /**
    * Task to call the flush completion callback
    */
-  struct GNUNET_SCHEDULER_Task * flush_completion_task;
+  struct GNUNET_SCHEDULER_Task *flush_completion_task;
 
   /**
-   * Task to be executed when flushing takes too long
+   * Number of entries in the MQ.
    */
-  struct GNUNET_SCHEDULER_Task * timeout_flush_task;
+  unsigned int mq_len;
 };
 
 
-/**
- * Cancels the flush timeout task
- *
- * @param h handle to the logger
- */
-static void
-cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h)
-{
-  GNUNET_SCHEDULER_cancel (h->timeout_flush_task);
-  h->timeout_flush_task = NULL;
-}
-
-
 /**
  * Task to call the flush completion notification
  *
@@ -186,8 +113,6 @@ call_flush_completion (void *cls)
   h->cb = NULL;
   cb_cls = h->cb_cls;
   h->cb_cls = NULL;
-  if (NULL != h->timeout_flush_task)
-    cancel_timeout_flush (h);
   if (NULL != cb)
     cb (cb_cls, bw);
 }
@@ -203,97 +128,39 @@ trigger_flush_notification (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
   if (NULL != h->flush_completion_task)
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-  h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, h);
+  h->flush_completion_task
+    = GNUNET_SCHEDULER_add_now (&call_flush_completion,
+                                h);
 }
 
 
 /**
- * Function called to notify a client about the connection begin ready to queue
- * more data.  "buf" will be NULL and "size" zero if the connection was closed
- * for writing in the meantime.
+ * Send the buffered data to the service
  *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param h the logger handle
  */
-static size_t
-transmit_ready_notify (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
-  struct MessageQueue *mq;
-
-  h->th = NULL;
-  mq = h->mq_head;
-  GNUNET_assert (NULL != mq);
-  if ((0 == size) && (NULL == buf))     /* Timeout */
-  {
-    LOG_DEBUG ("Message sending timed out -- retrying\n");
-    h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
-    h->th =
-        GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                             ntohs (mq->msg->size),
-                                             h->retry_backoff, GNUNET_YES,
-                                             &transmit_ready_notify, h);
-    return 0;
-  }
-  h->retry_backoff = GNUNET_TIME_UNIT_ZERO;
-  GNUNET_assert (ntohs (mq->msg->size) <= size);
-  size = ntohs (mq->msg->size);
-  memcpy (buf, mq->msg, size);
-  LOG_DEBUG ("Message of type: %u and size: %u sent\n",
-             ntohs (mq->msg->type), size);
-  GNUNET_free (mq->msg);
-  GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
-  GNUNET_free (mq);
-  h->bwrote += (size - sizeof (struct GNUNET_MessageHeader));
-  mq = h->mq_head;
-  if (NULL != mq)
-  {
-    h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
-    h->th =
-        GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                             ntohs (mq->msg->size),
-                                             h->retry_backoff, GNUNET_YES,
-                                             &transmit_ready_notify, h);
-    return size;
-  }
-  if (NULL != h->cb)
-    trigger_flush_notification (h);       /* Call the flush completion callback */
-  return size;
-}
+static void
+dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h);
 
 
 /**
- * Queues a message in send queue of the logger handle
+ * MQ successfully sent a message.
  *
- * @param h the logger handle
- * @param msg the message to queue
+ * @param cls our handle
  */
 static void
-queue_message (struct GNUNET_TESTBED_LOGGER_Handle *h,
-               struct GNUNET_MessageHeader *msg)
+notify_sent (void *cls)
 {
-  struct MessageQueue *mq;
-  uint16_t type;
-  uint16_t size;
-
-  type = ntohs (msg->type);
-  size = ntohs (msg->size);
-  mq = GNUNET_new (struct MessageQueue);
-  mq->msg = msg;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queueing message of type %u, size %u for sending\n", type,
-       ntohs (msg->size));
-  GNUNET_CONTAINER_DLL_insert_tail (h->mq_head, h->mq_tail, mq);
-  if (NULL == h->th)
+  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+
+  h->mq_len--;
+  if ( (0 == h->mq_len) &&
+       (NULL != h->cb) )
   {
-    h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
-    h->th =
-        GNUNET_CLIENT_notify_transmit_ready (h->client, size,
-                                             h->retry_backoff, GNUNET_YES,
-                                             &transmit_ready_notify,
-                                             h);
+    if (0 == h->buse)
+      trigger_flush_notification (h);
+    else
+      dispatch_buffer (h);
   }
 }
 
@@ -307,16 +174,40 @@ static void
 dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
   struct GNUNET_MessageHeader *msg;
-  size_t msize;
-
-  msize = sizeof (struct GNUNET_MessageHeader) + h->bs;
-  msg = GNUNET_realloc (h->buf, msize);
-  h->buf = NULL;
-  memmove (&msg[1], msg, h->bs);
-  h->bs = 0;
-  msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
-  msg->size = htons (msize);
-  queue_message (h, msg);
+  struct GNUNET_MQ_Envelope *env;
+
+  env = GNUNET_MQ_msg_extra (msg,
+                             h->buse,
+                             GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
+  memcpy (&msg[1],
+          h->buf,
+          h->buse);
+  h->bwrote += h->buse;
+  h->buse = 0;
+  h->mq_len++;
+  GNUNET_MQ_notify_sent (env,
+                         &notify_sent,
+                         h);
+  GNUNET_MQ_send (h->mq,
+                  env);
+}
+
+
+/**
+ * We got disconnected from the logger.  Stop logging.
+  *
+ * @param cls the `struct GNUNET_TESTBED_LOGGER_Handle`
+ * @param error error code
+  */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+
+  GNUNET_break (0);
+  GNUNET_MQ_destroy (h->mq);
+  h->mq = NULL;
 }
 
 
@@ -331,13 +222,18 @@ struct GNUNET_TESTBED_LOGGER_Handle *
 GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct GNUNET_TESTBED_LOGGER_Handle *h;
-  struct GNUNET_CLIENT_Connection *client;
 
-  client = GNUNET_CLIENT_connect ("testbed-logger", cfg);
-  if (NULL == client)
-    return NULL;
   h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle);
-  h->client = client;
+  h->mq = GNUNET_CLIENT_connecT (cfg,
+                                 "testbed-logger",
+                                 NULL,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+  {
+    GNUNET_free (h);
+    return NULL;
+  }
   return h;
 }
 
@@ -350,23 +246,20 @@ GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 void
 GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
-  struct MessageQueue *mq;
-  unsigned int lost;
-
   if (NULL != h->flush_completion_task)
+  {
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-  lost = 0;
-  while (NULL != (mq = h->mq_head))
+    h->flush_completion_task = NULL;
+  }
+  if (0 != h->mq_len)
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         "Disconnect lost %u logger message[s]\n",
+         h->mq_len);
+  if (NULL != h->mq)
   {
-    GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
-    GNUNET_free (mq->msg);
-    GNUNET_free (mq);
-    lost++;
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
   }
-  if (0 != lost)
-    LOG (GNUNET_ERROR_TYPE_WARNING, "Cleaning up %u unsent logger message[s]\n",
-         lost);
-  GNUNET_CLIENT_disconnect (h->client);
   GNUNET_free (h);
 }
 
@@ -378,63 +271,28 @@ GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
  *
  * @param h the logger handle
  * @param data the data to send;
- * @param size how many bytes of data to send
+ * @param size how many bytes of @a data to send
  */
 void
 GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
-                             const void *data, size_t size)
+                             const void *data,
+                             size_t size)
 {
-  size_t fit_size;
-
-  GNUNET_assert (0 != size);
-  GNUNET_assert (NULL != data);
-  GNUNET_assert (size <= (BUFFER_SIZE - sizeof (struct GNUNET_MessageHeader)));
-  fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size;
-  if ( BUFFER_SIZE < fit_size )
-    dispatch_buffer (h);
-  if (NULL == h->buf)
-  {
-    h->buf = GNUNET_malloc (size);
-    h->bs = size;
-    memcpy (h->buf, data, size);
-    goto dispatch_ready;
-  }
-  h->buf = GNUNET_realloc (h->buf, h->bs + size);
-  memcpy (h->buf + h->bs, data, size);
-  h->bs += size;
-
- dispatch_ready:
-  if (BUFFER_SIZE == fit_size)
-    dispatch_buffer (h);
-}
-
-
-/**
- * Task to be executed when flushing our local buffer takes longer than timeout
- * given to GNUNET_TESTBED_LOGGER_flush().  The flush completion callback will
- * be called with 0 as the amount of data sent.
- *
- * @param cls the logger handle
- */
-static void
-timeout_flush (void *cls)
-{
-  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
-  GNUNET_TESTBED_LOGGER_FlushCompletion cb;
-  void *cb_cls;
-
-  h->timeout_flush_task = NULL;
-  cb = h->cb;
-  h->cb = NULL;
-  cb_cls = h->cb_cls;
-  h->cb_cls = NULL;
-  if (NULL != h->flush_completion_task)
+  if (NULL == h->mq)
+    return;
+  while (0 != size)
   {
-    GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-    h->flush_completion_task = NULL;
+    size_t fit_size = GNUNET_MIN (size,
+                                  BUFFER_SIZE - h->buse);
+    memcpy (&h->buf[h->buse],
+            data,
+            fit_size);
+    h->buse += fit_size;
+    data += fit_size;
+    size -= fit_size;
+    if (0 != size)
+      dispatch_buffer (h);
   }
-  if (NULL != cb)
-    cb (cb_cls, 0);
 }
 
 
@@ -442,22 +300,19 @@ timeout_flush (void *cls)
  * Flush the buffered data to the logger service
  *
  * @param h the logger handle
- * @param timeout how long to wait before calling the flust completion callback
  * @param cb the callback to call after the data is flushed
  * @param cb_cls the closure for the above callback
  */
 void
 GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
-                             struct GNUNET_TIME_Relative timeout,
                              GNUNET_TESTBED_LOGGER_FlushCompletion cb,
                              void *cb_cls)
 {
+  GNUNET_assert (NULL == h->cb);
   h->cb = cb;
   h->cb_cls = cb_cls;
-  GNUNET_assert (NULL == h->timeout_flush_task);
-  h->timeout_flush_task =
-      GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h);
-  if (NULL == h->buf)
+  if ( (NULL == h->mq) ||
+       (NULL == h->buf) )
   {
     trigger_flush_notification (h);
     return;
@@ -481,8 +336,6 @@ GNUNET_TESTBED_LOGGER_flush_cancel (struct GNUNET_TESTBED_LOGGER_Handle *h)
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
     h->flush_completion_task = NULL;
   }
-  if (NULL != h->timeout_flush_task)
-    cancel_timeout_flush (h);
   h->cb = NULL;
   h->cb_cls = NULL;
 }