/*
This file is part of GNUnet
- Copyright (C) 2008--2013 GNUnet e.V.
+ Copyright (C) 2008--2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* @file testbed-logger/testbed_logger_api.c
* @brief Client-side routines for communicating with the tesbted logger service
* @author Sree Harsha Totakura <sreeharsha@totakura.in>
+ * @author Christian Grothoff
*/
#include "platform.h"
#define LOG(kind, ...) \
GNUNET_log_from (kind, "testbed-logger-api", __VA_ARGS__)
-/**
- * Debug logging
- */
-#define LOG_DEBUG(...) \
- LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
-
-#ifdef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
-#undef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD
-#endif
-
-/**
- * Threshold after which exponential backoff should not increase (15 s).
- */
-#define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
/**
* The size of the buffer we fill before sending out the message
*/
-#define BUFFER_SIZE GNUNET_SERVER_MAX_MESSAGE_SIZE
-
-/**
- * The message queue for sending messages to the controller service
- */
-struct MessageQueue
-{
- /**
- * next pointer for DLL
- */
- struct MessageQueue *next;
-
- /**
- * prev pointer for DLL
- */
- struct MessageQueue *prev;
-
- /**
- * The message to be sent
- */
- struct GNUNET_MessageHeader *msg;
-
- /**
- * Completion callback
- */
- GNUNET_TESTBED_LOGGER_FlushCompletion cb;
-
- /**
- * callback closure
- */
- void *cb_cls;
-};
-
+#define BUFFER_SIZE (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_MessageHeader))
/**
* Connection handle for the logger service
/**
* Client connection
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * The transport handle
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
- * DLL head for the message queue
- */
- struct MessageQueue *mq_head;
-
- /**
- * DLL tail for the message queue
- */
- struct MessageQueue *mq_tail;
+ struct GNUNET_MQ_Handle *mq;
/**
* Flush completion callback
GNUNET_TESTBED_LOGGER_FlushCompletion cb;
/**
- * Closure for the above callback
+ * Closure for @e cb
*/
void *cb_cls;
/**
* Local buffer for data to be transmitted
*/
- void *buf;
+ char buf[BUFFER_SIZE];
/**
- * The size of the local buffer
+ * How many bytes in @a buf are in use?
*/
- size_t bs;
+ size_t buse;
/**
* Number of bytes wrote since last flush
/**
* Task to call the flush completion callback
*/
- struct GNUNET_SCHEDULER_Task * flush_completion_task;
+ struct GNUNET_SCHEDULER_Task *flush_completion_task;
/**
- * Task to be executed when flushing takes too long
+ * Number of entries in the MQ.
*/
- struct GNUNET_SCHEDULER_Task * timeout_flush_task;
+ unsigned int mq_len;
};
-/**
- * Cancels the flush timeout task
- *
- * @param h handle to the logger
- */
-static void
-cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h)
-{
- GNUNET_SCHEDULER_cancel (h->timeout_flush_task);
- h->timeout_flush_task = NULL;
-}
-
-
/**
* Task to call the flush completion notification
*
h->cb = NULL;
cb_cls = h->cb_cls;
h->cb_cls = NULL;
- if (NULL != h->timeout_flush_task)
- cancel_timeout_flush (h);
if (NULL != cb)
cb (cb_cls, bw);
}
{
if (NULL != h->flush_completion_task)
GNUNET_SCHEDULER_cancel (h->flush_completion_task);
- h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, h);
+ h->flush_completion_task
+ = GNUNET_SCHEDULER_add_now (&call_flush_completion,
+ h);
}
/**
- * Function called to notify a client about the connection begin ready to queue
- * more data. "buf" will be NULL and "size" zero if the connection was closed
- * for writing in the meantime.
+ * Send the buffered data to the service
*
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
+ * @param h the logger handle
*/
-static size_t
-transmit_ready_notify (void *cls, size_t size, void *buf)
-{
- struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
- struct MessageQueue *mq;
-
- h->th = NULL;
- mq = h->mq_head;
- GNUNET_assert (NULL != mq);
- if ((0 == size) && (NULL == buf)) /* Timeout */
- {
- LOG_DEBUG ("Message sending timed out -- retrying\n");
- h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
- h->th =
- GNUNET_CLIENT_notify_transmit_ready (h->client,
- ntohs (mq->msg->size),
- h->retry_backoff, GNUNET_YES,
- &transmit_ready_notify, h);
- return 0;
- }
- h->retry_backoff = GNUNET_TIME_UNIT_ZERO;
- GNUNET_assert (ntohs (mq->msg->size) <= size);
- size = ntohs (mq->msg->size);
- memcpy (buf, mq->msg, size);
- LOG_DEBUG ("Message of type: %u and size: %u sent\n",
- ntohs (mq->msg->type), size);
- GNUNET_free (mq->msg);
- GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
- GNUNET_free (mq);
- h->bwrote += (size - sizeof (struct GNUNET_MessageHeader));
- mq = h->mq_head;
- if (NULL != mq)
- {
- h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
- h->th =
- GNUNET_CLIENT_notify_transmit_ready (h->client,
- ntohs (mq->msg->size),
- h->retry_backoff, GNUNET_YES,
- &transmit_ready_notify, h);
- return size;
- }
- if (NULL != h->cb)
- trigger_flush_notification (h); /* Call the flush completion callback */
- return size;
-}
+static void
+dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h);
/**
- * Queues a message in send queue of the logger handle
+ * MQ successfully sent a message.
*
- * @param h the logger handle
- * @param msg the message to queue
+ * @param cls our handle
*/
static void
-queue_message (struct GNUNET_TESTBED_LOGGER_Handle *h,
- struct GNUNET_MessageHeader *msg)
+notify_sent (void *cls)
{
- struct MessageQueue *mq;
- uint16_t type;
- uint16_t size;
-
- type = ntohs (msg->type);
- size = ntohs (msg->size);
- mq = GNUNET_new (struct MessageQueue);
- mq->msg = msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %u, size %u for sending\n", type,
- ntohs (msg->size));
- GNUNET_CONTAINER_DLL_insert_tail (h->mq_head, h->mq_tail, mq);
- if (NULL == h->th)
+ struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+
+ h->mq_len--;
+ if ( (0 == h->mq_len) &&
+ (NULL != h->cb) )
{
- h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff);
- h->th =
- GNUNET_CLIENT_notify_transmit_ready (h->client, size,
- h->retry_backoff, GNUNET_YES,
- &transmit_ready_notify,
- h);
+ if (0 == h->buse)
+ trigger_flush_notification (h);
+ else
+ dispatch_buffer (h);
}
}
dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h)
{
struct GNUNET_MessageHeader *msg;
- size_t msize;
-
- msize = sizeof (struct GNUNET_MessageHeader) + h->bs;
- msg = GNUNET_realloc (h->buf, msize);
- h->buf = NULL;
- memmove (&msg[1], msg, h->bs);
- h->bs = 0;
- msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
- msg->size = htons (msize);
- queue_message (h, msg);
+ struct GNUNET_MQ_Envelope *env;
+
+ env = GNUNET_MQ_msg_extra (msg,
+ h->buse,
+ GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
+ memcpy (&msg[1],
+ h->buf,
+ h->buse);
+ h->bwrote += h->buse;
+ h->buse = 0;
+ h->mq_len++;
+ GNUNET_MQ_notify_sent (env,
+ ¬ify_sent,
+ h);
+ GNUNET_MQ_send (h->mq,
+ env);
+}
+
+
+/**
+ * We got disconnected from the logger. Stop logging.
+ *
+ * @param cls the `struct GNUNET_TESTBED_LOGGER_Handle`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+
+ GNUNET_break (0);
+ GNUNET_MQ_destroy (h->mq);
+ h->mq = NULL;
}
GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct GNUNET_TESTBED_LOGGER_Handle *h;
- struct GNUNET_CLIENT_Connection *client;
- client = GNUNET_CLIENT_connect ("testbed-logger", cfg);
- if (NULL == client)
- return NULL;
h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle);
- h->client = client;
+ h->mq = GNUNET_CLIENT_connecT (cfg,
+ "testbed-logger",
+ NULL,
+ &mq_error_handler,
+ h);
+ if (NULL == h->mq)
+ {
+ GNUNET_free (h);
+ return NULL;
+ }
return h;
}
void
GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
{
- struct MessageQueue *mq;
- unsigned int lost;
-
if (NULL != h->flush_completion_task)
+ {
GNUNET_SCHEDULER_cancel (h->flush_completion_task);
- lost = 0;
- while (NULL != (mq = h->mq_head))
+ h->flush_completion_task = NULL;
+ }
+ if (0 != h->mq_len)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Disconnect lost %u logger message[s]\n",
+ h->mq_len);
+ if (NULL != h->mq)
{
- GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
- GNUNET_free (mq->msg);
- GNUNET_free (mq);
- lost++;
+ GNUNET_MQ_destroy (h->mq);
+ h->mq = NULL;
}
- if (0 != lost)
- LOG (GNUNET_ERROR_TYPE_WARNING, "Cleaning up %u unsent logger message[s]\n",
- lost);
- GNUNET_CLIENT_disconnect (h->client);
GNUNET_free (h);
}
*
* @param h the logger handle
* @param data the data to send;
- * @param size how many bytes of data to send
+ * @param size how many bytes of @a data to send
*/
void
GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
- const void *data, size_t size)
+ const void *data,
+ size_t size)
{
- size_t fit_size;
-
- GNUNET_assert (0 != size);
- GNUNET_assert (NULL != data);
- GNUNET_assert (size <= (BUFFER_SIZE - sizeof (struct GNUNET_MessageHeader)));
- fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size;
- if ( BUFFER_SIZE < fit_size )
- dispatch_buffer (h);
- if (NULL == h->buf)
- {
- h->buf = GNUNET_malloc (size);
- h->bs = size;
- memcpy (h->buf, data, size);
- goto dispatch_ready;
- }
- h->buf = GNUNET_realloc (h->buf, h->bs + size);
- memcpy (h->buf + h->bs, data, size);
- h->bs += size;
-
- dispatch_ready:
- if (BUFFER_SIZE == fit_size)
- dispatch_buffer (h);
-}
-
-
-/**
- * Task to be executed when flushing our local buffer takes longer than timeout
- * given to GNUNET_TESTBED_LOGGER_flush(). The flush completion callback will
- * be called with 0 as the amount of data sent.
- *
- * @param cls the logger handle
- */
-static void
-timeout_flush (void *cls)
-{
- struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
- GNUNET_TESTBED_LOGGER_FlushCompletion cb;
- void *cb_cls;
-
- h->timeout_flush_task = NULL;
- cb = h->cb;
- h->cb = NULL;
- cb_cls = h->cb_cls;
- h->cb_cls = NULL;
- if (NULL != h->flush_completion_task)
+ if (NULL == h->mq)
+ return;
+ while (0 != size)
{
- GNUNET_SCHEDULER_cancel (h->flush_completion_task);
- h->flush_completion_task = NULL;
+ size_t fit_size = GNUNET_MIN (size,
+ BUFFER_SIZE - h->buse);
+ memcpy (&h->buf[h->buse],
+ data,
+ fit_size);
+ h->buse += fit_size;
+ data += fit_size;
+ size -= fit_size;
+ if (0 != size)
+ dispatch_buffer (h);
}
- if (NULL != cb)
- cb (cb_cls, 0);
}
* Flush the buffered data to the logger service
*
* @param h the logger handle
- * @param timeout how long to wait before calling the flust completion callback
* @param cb the callback to call after the data is flushed
* @param cb_cls the closure for the above callback
*/
void
GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
- struct GNUNET_TIME_Relative timeout,
GNUNET_TESTBED_LOGGER_FlushCompletion cb,
void *cb_cls)
{
+ GNUNET_assert (NULL == h->cb);
h->cb = cb;
h->cb_cls = cb_cls;
- GNUNET_assert (NULL == h->timeout_flush_task);
- h->timeout_flush_task =
- GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h);
- if (NULL == h->buf)
+ if ( (NULL == h->mq) ||
+ (NULL == h->buf) )
{
trigger_flush_notification (h);
return;
GNUNET_SCHEDULER_cancel (h->flush_completion_task);
h->flush_completion_task = NULL;
}
- if (NULL != h->timeout_flush_task)
- cancel_timeout_flush (h);
h->cb = NULL;
h->cb_cls = NULL;
}