-improve UDP logging
[oweals/gnunet.git] / src / testbed / testbed_logger_api.c
index 674a170d36e5725ee9405c0dcf3c30655d5e4835..f753b2e7ca348b8a9581aa9a4a6260d448d88d60 100644 (file)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      (C) 2008--2013 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -21,7 +21,7 @@
 /**
  * @file testbed/testbed_logger_api.c
  * @brief Client-side routines for communicating with the tesbted logger service
- * @author Sree Harsha Totakura <sreeharsha@totakura.in> 
+ * @author Sree Harsha Totakura <sreeharsha@totakura.in>
  */
 
 #include "platform.h"
  */
 #define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
 
+/**
+ * The size of the buffer we fill before sending out the message
+ */
+#define BUFFER_SIZE GNUNET_SERVER_MAX_MESSAGE_SIZE
 
 /**
  * The message queue for sending messages to the controller service
@@ -92,6 +96,9 @@ struct GNUNET_TESTBED_LOGGER_Handle
    */
   struct GNUNET_CLIENT_Connection *client;
 
+  /**
+   * The transport handle
+   */
   struct GNUNET_CLIENT_TransmitHandle *th;
 
   /**
@@ -104,22 +111,61 @@ struct GNUNET_TESTBED_LOGGER_Handle
    */
   struct MessageQueue *mq_tail;
 
-  GNUNET_SCHEDULER_TaskIdentifier flush_completion_task;
-
+  /**
+   * Flush completion callback
+   */
   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
 
+  /**
+   * Closure for the above callback
+   */
   void *cb_cls;
 
+  /**
+   * Local buffer for data to be transmitted
+   */
   void *buf;
 
+  /**
+   * The size of the local buffer
+   */
   size_t bs;
 
+  /**
+   * Number of bytes wrote since last flush
+   */
   size_t bwrote;
 
+  /**
+   * How long after should we retry sending a message to the service?
+   */
   struct GNUNET_TIME_Relative retry_backoff;
+
+  /**
+   * Task to call the flush completion callback
+   */
+  struct GNUNET_SCHEDULER_Task * flush_completion_task;
+
+  /**
+   * Task to be executed when flushing takes too long
+   */
+  struct GNUNET_SCHEDULER_Task * timeout_flush_task;
 };
 
 
+/**
+ * Cancels the flush timeout task
+ *
+ * @param h handle to the logger
+ */
+static void
+cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h)
+{
+  GNUNET_SCHEDULER_cancel (h->timeout_flush_task);
+  h->timeout_flush_task = NULL;
+}
+
+
 /**
  * Task to call the flush completion notification
  *
@@ -129,18 +175,20 @@ struct GNUNET_TESTBED_LOGGER_Handle
 static void
 call_flush_completion (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_TESTBED_LOGGER_Handle *h = cls; 
+  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
   GNUNET_TESTBED_LOGGER_FlushCompletion cb;
   void *cb_cls;
   size_t bw;
 
-  h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
+  h->flush_completion_task = NULL;
   bw = h->bwrote;
   h->bwrote = 0;
   cb = h->cb;
   h->cb = NULL;
   cb_cls = h->cb_cls;
   h->cb_cls = NULL;
+  if (NULL != h->timeout_flush_task)
+    cancel_timeout_flush (h);
   if (NULL != cb)
     cb (cb_cls, bw);
 }
@@ -154,7 +202,7 @@ call_flush_completion (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 static void
 trigger_flush_notification (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
-  if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
+  if (NULL != h->flush_completion_task)
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
   h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, h);
 }
@@ -233,7 +281,7 @@ queue_message (struct GNUNET_TESTBED_LOGGER_Handle *h,
 
   type = ntohs (msg->type);
   size = ntohs (msg->size);
-  mq = GNUNET_malloc (sizeof (struct MessageQueue));
+  mq = GNUNET_new (struct MessageQueue);
   mq->msg = msg;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing message of type %u, size %u for sending\n", type,
@@ -266,7 +314,7 @@ dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h)
   msg = GNUNET_realloc (h->buf, msize);
   h->buf = NULL;
   memmove (&msg[1], msg, h->bs);
-  h->bs = 0;    
+  h->bs = 0;
   msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
   msg->size = htons (msize);
   queue_message (h, msg);
@@ -285,11 +333,11 @@ GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct GNUNET_TESTBED_LOGGER_Handle *h;
   struct GNUNET_CLIENT_Connection *client;
-  
+
   client = GNUNET_CLIENT_connect ("testbed-logger", cfg);
   if (NULL == client)
     return NULL;
-  h = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_LOGGER_Handle));
+  h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle);
   h->client = client;
   return h;
 }
@@ -304,15 +352,21 @@ void
 GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
   struct MessageQueue *mq;
+  unsigned int lost;
 
-  if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
+  if (NULL != h->flush_completion_task)
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
+  lost = 0;
   while (NULL != (mq = h->mq_head))
   {
     GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
     GNUNET_free (mq->msg);
     GNUNET_free (mq);
+    lost++;
   }
+  if (0 != lost)
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Cleaning up %u unsent logger message[s]\n",
+         lost);
   GNUNET_CLIENT_disconnect (h->client);
   GNUNET_free (h);
 }
@@ -330,27 +384,59 @@ GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
 void
 GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
                              const void *data, size_t size)
-{  
+{
   size_t fit_size;
 
   GNUNET_assert (0 != size);
   GNUNET_assert (NULL != data);
-  GNUNET_assert (size < (GNUNET_SERVER_MAX_MESSAGE_SIZE
-                         - sizeof (struct GNUNET_MessageHeader)));
+  GNUNET_assert (size <= (BUFFER_SIZE - sizeof (struct GNUNET_MessageHeader)));
   fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size;
-  if ( GNUNET_SERVER_MAX_MESSAGE_SIZE < fit_size )
+  if ( BUFFER_SIZE < fit_size )
     dispatch_buffer (h);
   if (NULL == h->buf)
   {
     h->buf = GNUNET_malloc (size);
     h->bs = size;
     memcpy (h->buf, data, size);
-    return;
+    goto dispatch_ready;
   }
   h->buf = GNUNET_realloc (h->buf, h->bs + size);
   memcpy (h->buf + h->bs, data, size);
   h->bs += size;
-  return;
+
+ dispatch_ready:
+  if (BUFFER_SIZE == fit_size)
+    dispatch_buffer (h);
+}
+
+
+/**
+ * Task to be executed when flushing our local buffer takes longer than timeout
+ * given to GNUNET_TESTBED_LOGGER_flush().  The flush completion callback will
+ * be called with 0 as the amount of data sent.
+ *
+ * @param cls the logger handle
+ * @param tc scheduler task context
+ */
+static void
+timeout_flush (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+  GNUNET_TESTBED_LOGGER_FlushCompletion cb;
+  void *cb_cls;
+
+  h->timeout_flush_task = NULL;
+  cb = h->cb;
+  h->cb = NULL;
+  cb_cls = h->cb_cls;
+  h->cb_cls = NULL;
+  if (NULL != h->flush_completion_task)
+  {
+    GNUNET_SCHEDULER_cancel (h->flush_completion_task);
+    h->flush_completion_task = NULL;
+  }
+  if (NULL != cb)
+    cb (cb_cls, 0);
 }
 
 
@@ -358,16 +444,21 @@ GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
  * Flush the buffered data to the logger service
  *
  * @param h the logger handle
+ * @param timeout how long to wait before calling the flust completion callback
  * @param cb the callback to call after the data is flushed
  * @param cb_cls the closure for the above callback
  */
 void
 GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
+                             struct GNUNET_TIME_Relative timeout,
                              GNUNET_TESTBED_LOGGER_FlushCompletion cb,
                              void *cb_cls)
 {
   h->cb = cb;
   h->cb_cls = cb_cls;
+  GNUNET_assert (NULL == h->timeout_flush_task);
+  h->timeout_flush_task =
+      GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h);
   if (NULL == h->buf)
   {
     trigger_flush_notification (h);
@@ -378,18 +469,22 @@ GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
 
 
 /**
- * Cancel notification upon flush.
+ * Cancel notification upon flush.  Should only be used when the flush
+ * completion callback given to GNUNET_TESTBED_LOGGER_flush() is not already
+ * called.
  *
  * @param h the logger handle
  */
 void
 GNUNET_TESTBED_LOGGER_flush_cancel (struct GNUNET_TESTBED_LOGGER_Handle *h)
 {
-  if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
+  if (NULL != h->flush_completion_task)
   {
     GNUNET_SCHEDULER_cancel (h->flush_completion_task);
-    h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
+    h->flush_completion_task = NULL;
   }
+  if (NULL != h->timeout_flush_task)
+    cancel_timeout_flush (h);
   h->cb = NULL;
   h->cb_cls = NULL;
 }