/*
This file is part of GNUnet
- (C) 2008--2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2008--2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
/**
* @file testbed/testbed_logger_api.c
* @brief Client-side routines for communicating with the tesbted logger service
- * @author Sree Harsha Totakura <sreeharsha@totakura.in>
+ * @author Sree Harsha Totakura <sreeharsha@totakura.in>
*/
#include "platform.h"
*/
#define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3)
+/**
+ * The size of the buffer we fill before sending out the message
+ */
+#define BUFFER_SIZE GNUNET_SERVER_MAX_MESSAGE_SIZE
/**
* The message queue for sending messages to the controller service
*/
struct GNUNET_CLIENT_Connection *client;
+ /**
+ * The transport handle
+ */
struct GNUNET_CLIENT_TransmitHandle *th;
/**
*/
struct MessageQueue *mq_tail;
- GNUNET_SCHEDULER_TaskIdentifier flush_completion_task;
-
+ /**
+ * Flush completion callback
+ */
GNUNET_TESTBED_LOGGER_FlushCompletion cb;
+ /**
+ * Closure for the above callback
+ */
void *cb_cls;
+ /**
+ * Local buffer for data to be transmitted
+ */
void *buf;
+ /**
+ * The size of the local buffer
+ */
size_t bs;
+ /**
+ * Number of bytes wrote since last flush
+ */
size_t bwrote;
+ /**
+ * How long after should we retry sending a message to the service?
+ */
struct GNUNET_TIME_Relative retry_backoff;
+
+ /**
+ * Task to call the flush completion callback
+ */
+ struct GNUNET_SCHEDULER_Task * flush_completion_task;
+
+ /**
+ * Task to be executed when flushing takes too long
+ */
+ struct GNUNET_SCHEDULER_Task * timeout_flush_task;
};
+/**
+ * Cancels the flush timeout task
+ *
+ * @param h handle to the logger
+ */
+static void
+cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h)
+{
+ GNUNET_SCHEDULER_cancel (h->timeout_flush_task);
+ h->timeout_flush_task = NULL;
+}
+
+
/**
* Task to call the flush completion notification
*
static void
call_flush_completion (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+ struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
GNUNET_TESTBED_LOGGER_FlushCompletion cb;
void *cb_cls;
size_t bw;
- h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
+ h->flush_completion_task = NULL;
bw = h->bwrote;
h->bwrote = 0;
cb = h->cb;
h->cb = NULL;
cb_cls = h->cb_cls;
h->cb_cls = NULL;
+ if (NULL != h->timeout_flush_task)
+ cancel_timeout_flush (h);
if (NULL != cb)
cb (cb_cls, bw);
}
/**
* Schedule the flush completion notification task
*
- * @param
- * @return
+ * @param h logger handle
*/
static void
trigger_flush_notification (struct GNUNET_TESTBED_LOGGER_Handle *h)
{
- if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
+ if (NULL != h->flush_completion_task)
GNUNET_SCHEDULER_cancel (h->flush_completion_task);
h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, h);
}
GNUNET_free (mq->msg);
GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
GNUNET_free (mq);
- h->bwrote += size;
+ h->bwrote += (size - sizeof (struct GNUNET_MessageHeader));
mq = h->mq_head;
if (NULL != mq)
{
type = ntohs (msg->type);
size = ntohs (msg->size);
- mq = GNUNET_malloc (sizeof (struct MessageQueue));
+ mq = GNUNET_new (struct MessageQueue);
mq->msg = msg;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queueing message of type %u, size %u for sending\n", type,
msg = GNUNET_realloc (h->buf, msize);
h->buf = NULL;
memmove (&msg[1], msg, h->bs);
- h->bs = 0;
+ h->bs = 0;
msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG);
msg->size = htons (msize);
queue_message (h, msg);
{
struct GNUNET_TESTBED_LOGGER_Handle *h;
struct GNUNET_CLIENT_Connection *client;
-
+
client = GNUNET_CLIENT_connect ("testbed-logger", cfg);
if (NULL == client)
return NULL;
- h = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_LOGGER_Handle));
+ h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle);
h->client = client;
return h;
}
GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h)
{
struct MessageQueue *mq;
+ unsigned int lost;
- if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
+ if (NULL != h->flush_completion_task)
GNUNET_SCHEDULER_cancel (h->flush_completion_task);
+ lost = 0;
while (NULL != (mq = h->mq_head))
{
GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq);
GNUNET_free (mq->msg);
GNUNET_free (mq);
+ lost++;
}
+ if (0 != lost)
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Cleaning up %u unsent logger message[s]\n",
+ lost);
GNUNET_CLIENT_disconnect (h->client);
GNUNET_free (h);
}
* @param h the logger handle
* @param data the data to send;
* @param size how many bytes of data to send
- * @param cb the callback to be called upon completion of the send request
- * @param cb_cls the closure for the above callback
- * @return the send handle which can used for cancelling the send operation.
- * Will be invalid if upon call to completion callback
*/
void
GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h,
const void *data, size_t size)
-{
+{
size_t fit_size;
GNUNET_assert (0 != size);
GNUNET_assert (NULL != data);
- GNUNET_assert (size < (GNUNET_SERVER_MAX_MESSAGE_SIZE
- - sizeof (struct GNUNET_MessageHeader)));
+ GNUNET_assert (size <= (BUFFER_SIZE - sizeof (struct GNUNET_MessageHeader)));
fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size;
- if ( GNUNET_SERVER_MAX_MESSAGE_SIZE < fit_size )
+ if ( BUFFER_SIZE < fit_size )
dispatch_buffer (h);
if (NULL == h->buf)
{
h->buf = GNUNET_malloc (size);
h->bs = size;
memcpy (h->buf, data, size);
- return;
+ goto dispatch_ready;
}
h->buf = GNUNET_realloc (h->buf, h->bs + size);
memcpy (h->buf + h->bs, data, size);
h->bs += size;
- return;
+
+ dispatch_ready:
+ if (BUFFER_SIZE == fit_size)
+ dispatch_buffer (h);
+}
+
+
+/**
+ * Task to be executed when flushing our local buffer takes longer than timeout
+ * given to GNUNET_TESTBED_LOGGER_flush(). The flush completion callback will
+ * be called with 0 as the amount of data sent.
+ *
+ * @param cls the logger handle
+ * @param tc scheduler task context
+ */
+static void
+timeout_flush (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_TESTBED_LOGGER_Handle *h = cls;
+ GNUNET_TESTBED_LOGGER_FlushCompletion cb;
+ void *cb_cls;
+
+ h->timeout_flush_task = NULL;
+ cb = h->cb;
+ h->cb = NULL;
+ cb_cls = h->cb_cls;
+ h->cb_cls = NULL;
+ if (NULL != h->flush_completion_task)
+ {
+ GNUNET_SCHEDULER_cancel (h->flush_completion_task);
+ h->flush_completion_task = NULL;
+ }
+ if (NULL != cb)
+ cb (cb_cls, 0);
}
* Flush the buffered data to the logger service
*
* @param h the logger handle
+ * @param timeout how long to wait before calling the flust completion callback
* @param cb the callback to call after the data is flushed
* @param cb_cls the closure for the above callback
*/
void
GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_TESTBED_LOGGER_FlushCompletion cb,
void *cb_cls)
{
h->cb = cb;
h->cb_cls = cb_cls;
+ GNUNET_assert (NULL == h->timeout_flush_task);
+ h->timeout_flush_task =
+ GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h);
if (NULL == h->buf)
{
trigger_flush_notification (h);
/**
- * Cancel notification upon flush.
+ * Cancel notification upon flush. Should only be used when the flush
+ * completion callback given to GNUNET_TESTBED_LOGGER_flush() is not already
+ * called.
*
* @param h the logger handle
*/
void
GNUNET_TESTBED_LOGGER_flush_cancel (struct GNUNET_TESTBED_LOGGER_Handle *h)
{
- if (GNUNET_SCHEDULER_NO_TASK != h->flush_completion_task)
+ if (NULL != h->flush_completion_task)
{
GNUNET_SCHEDULER_cancel (h->flush_completion_task);
- h->flush_completion_task = GNUNET_SCHEDULER_NO_TASK;
+ h->flush_completion_task = NULL;
}
+ if (NULL != h->timeout_flush_task)
+ cancel_timeout_flush (h);
h->cb = NULL;
h->cb_cls = NULL;
}