From: Christian Grothoff Date: Fri, 24 Jun 2016 14:34:07 +0000 (+0000) Subject: adapting testbed-logger to MQ API X-Git-Tag: initial-import-from-subversion-38251~715 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=4a99626e6a02f71ed4d492d3454f22e1674c9c13;p=oweals%2Fgnunet.git adapting testbed-logger to MQ API --- diff --git a/src/include/gnunet_testbed_logger_service.h b/src/include/gnunet_testbed_logger_service.h index c10c2ee6b..4d7c3f7cb 100644 --- a/src/include/gnunet_testbed_logger_service.h +++ b/src/include/gnunet_testbed_logger_service.h @@ -40,7 +40,7 @@ extern "C" #endif #endif -#include "gnunet_configuration_lib.h" +#include "gnunet_util_lib.h" /** * Opaque handle for the logging service @@ -70,7 +70,7 @@ GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h); /** * Functions of this type are called to notify a successful transmission of the - * message to the logger service + * message to the logger service. * * @param cls the closure given to GNUNET_TESTBED_LOGGER_send() * @param size the amount of data sent @@ -87,7 +87,7 @@ typedef void * * @param h the logger handle * @param data the data to send; - * @param size how many bytes of data to send + * @param size how many bytes of @a data to send */ void GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h, @@ -99,13 +99,11 @@ GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h, * Flush the buffered data to the logger service * * @param h the logger handle - * @param timeout how long to wait before calling the flust completion callback * @param cb the callback to call after the data is flushed - * @param cb_cls the closure for the above callback + * @param cb_cls the closure for @a cb */ void GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h, - struct GNUNET_TIME_Relative timeout, GNUNET_TESTBED_LOGGER_FlushCompletion cb, void *cb_cls); diff --git a/src/nse/gnunet-service-nse.c b/src/nse/gnunet-service-nse.c index 4d920465c..3db330204 100644 --- a/src/nse/gnunet-service-nse.c +++ b/src/nse/gnunet-service-nse.c @@ -1386,9 +1386,9 @@ shutdown_task (void *cls) } if (NULL != lh) { - struct GNUNET_TIME_Relative timeout; - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); - GNUNET_TESTBED_LOGGER_flush (lh, timeout, &flush_comp_cb, NULL); + GNUNET_TESTBED_LOGGER_flush (lh, + &flush_comp_cb, + NULL); } if (NULL != histogram) { diff --git a/src/testbed-logger/test_testbed_logger_api.c b/src/testbed-logger/test_testbed_logger_api.c index 8f7391f22..0ebe0c3f4 100644 --- a/src/testbed-logger/test_testbed_logger_api.c +++ b/src/testbed-logger/test_testbed_logger_api.c @@ -209,7 +209,6 @@ do_write (void *cls) if (0 == i++) return; GNUNET_TESTBED_LOGGER_flush (h, - GNUNET_TIME_UNIT_FOREVER_REL, &flush_comp, &write_task); } diff --git a/src/testbed-logger/testbed_logger_api.c b/src/testbed-logger/testbed_logger_api.c index aaf18cd33..869054cf3 100644 --- a/src/testbed-logger/testbed_logger_api.c +++ b/src/testbed-logger/testbed_logger_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2008--2013 GNUnet e.V. + Copyright (C) 2008--2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -22,6 +22,7 @@ * @file testbed-logger/testbed_logger_api.c * @brief Client-side routines for communicating with the tesbted logger service * @author Sree Harsha Totakura + * @author Christian Grothoff */ #include "platform.h" @@ -34,57 +35,11 @@ #define LOG(kind, ...) \ GNUNET_log_from (kind, "testbed-logger-api", __VA_ARGS__) -/** - * Debug logging - */ -#define LOG_DEBUG(...) \ - LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__) - -#ifdef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD -#undef GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD -#endif - -/** - * Threshold after which exponential backoff should not increase (15 s). - */ -#define GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3) /** * The size of the buffer we fill before sending out the message */ -#define BUFFER_SIZE GNUNET_SERVER_MAX_MESSAGE_SIZE - -/** - * The message queue for sending messages to the controller service - */ -struct MessageQueue -{ - /** - * next pointer for DLL - */ - struct MessageQueue *next; - - /** - * prev pointer for DLL - */ - struct MessageQueue *prev; - - /** - * The message to be sent - */ - struct GNUNET_MessageHeader *msg; - - /** - * Completion callback - */ - GNUNET_TESTBED_LOGGER_FlushCompletion cb; - - /** - * callback closure - */ - void *cb_cls; -}; - +#define BUFFER_SIZE (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_MessageHeader)) /** * Connection handle for the logger service @@ -94,22 +49,7 @@ struct GNUNET_TESTBED_LOGGER_Handle /** * Client connection */ - struct GNUNET_CLIENT_Connection *client; - - /** - * The transport handle - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * DLL head for the message queue - */ - struct MessageQueue *mq_head; - - /** - * DLL tail for the message queue - */ - struct MessageQueue *mq_tail; + struct GNUNET_MQ_Handle *mq; /** * Flush completion callback @@ -117,19 +57,19 @@ struct GNUNET_TESTBED_LOGGER_Handle GNUNET_TESTBED_LOGGER_FlushCompletion cb; /** - * Closure for the above callback + * Closure for @e cb */ void *cb_cls; /** * Local buffer for data to be transmitted */ - void *buf; + char buf[BUFFER_SIZE]; /** - * The size of the local buffer + * How many bytes in @a buf are in use? */ - size_t bs; + size_t buse; /** * Number of bytes wrote since last flush @@ -144,28 +84,15 @@ struct GNUNET_TESTBED_LOGGER_Handle /** * Task to call the flush completion callback */ - struct GNUNET_SCHEDULER_Task * flush_completion_task; + struct GNUNET_SCHEDULER_Task *flush_completion_task; /** - * Task to be executed when flushing takes too long + * Number of entries in the MQ. */ - struct GNUNET_SCHEDULER_Task * timeout_flush_task; + unsigned int mq_len; }; -/** - * Cancels the flush timeout task - * - * @param h handle to the logger - */ -static void -cancel_timeout_flush (struct GNUNET_TESTBED_LOGGER_Handle *h) -{ - GNUNET_SCHEDULER_cancel (h->timeout_flush_task); - h->timeout_flush_task = NULL; -} - - /** * Task to call the flush completion notification * @@ -186,8 +113,6 @@ call_flush_completion (void *cls) h->cb = NULL; cb_cls = h->cb_cls; h->cb_cls = NULL; - if (NULL != h->timeout_flush_task) - cancel_timeout_flush (h); if (NULL != cb) cb (cb_cls, bw); } @@ -203,97 +128,39 @@ trigger_flush_notification (struct GNUNET_TESTBED_LOGGER_Handle *h) { if (NULL != h->flush_completion_task) GNUNET_SCHEDULER_cancel (h->flush_completion_task); - h->flush_completion_task = GNUNET_SCHEDULER_add_now (&call_flush_completion, h); + h->flush_completion_task + = GNUNET_SCHEDULER_add_now (&call_flush_completion, + h); } /** - * Function called to notify a client about the connection begin ready to queue - * more data. "buf" will be NULL and "size" zero if the connection was closed - * for writing in the meantime. + * Send the buffered data to the service * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param h the logger handle */ -static size_t -transmit_ready_notify (void *cls, size_t size, void *buf) -{ - struct GNUNET_TESTBED_LOGGER_Handle *h = cls; - struct MessageQueue *mq; - - h->th = NULL; - mq = h->mq_head; - GNUNET_assert (NULL != mq); - if ((0 == size) && (NULL == buf)) /* Timeout */ - { - LOG_DEBUG ("Message sending timed out -- retrying\n"); - h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, - ntohs (mq->msg->size), - h->retry_backoff, GNUNET_YES, - &transmit_ready_notify, h); - return 0; - } - h->retry_backoff = GNUNET_TIME_UNIT_ZERO; - GNUNET_assert (ntohs (mq->msg->size) <= size); - size = ntohs (mq->msg->size); - memcpy (buf, mq->msg, size); - LOG_DEBUG ("Message of type: %u and size: %u sent\n", - ntohs (mq->msg->type), size); - GNUNET_free (mq->msg); - GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq); - GNUNET_free (mq); - h->bwrote += (size - sizeof (struct GNUNET_MessageHeader)); - mq = h->mq_head; - if (NULL != mq) - { - h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, - ntohs (mq->msg->size), - h->retry_backoff, GNUNET_YES, - &transmit_ready_notify, h); - return size; - } - if (NULL != h->cb) - trigger_flush_notification (h); /* Call the flush completion callback */ - return size; -} +static void +dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h); /** - * Queues a message in send queue of the logger handle + * MQ successfully sent a message. * - * @param h the logger handle - * @param msg the message to queue + * @param cls our handle */ static void -queue_message (struct GNUNET_TESTBED_LOGGER_Handle *h, - struct GNUNET_MessageHeader *msg) +notify_sent (void *cls) { - struct MessageQueue *mq; - uint16_t type; - uint16_t size; - - type = ntohs (msg->type); - size = ntohs (msg->size); - mq = GNUNET_new (struct MessageQueue); - mq->msg = msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u, size %u for sending\n", type, - ntohs (msg->size)); - GNUNET_CONTAINER_DLL_insert_tail (h->mq_head, h->mq_tail, mq); - if (NULL == h->th) + struct GNUNET_TESTBED_LOGGER_Handle *h = cls; + + h->mq_len--; + if ( (0 == h->mq_len) && + (NULL != h->cb) ) { - h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, size, - h->retry_backoff, GNUNET_YES, - &transmit_ready_notify, - h); + if (0 == h->buse) + trigger_flush_notification (h); + else + dispatch_buffer (h); } } @@ -307,16 +174,40 @@ static void dispatch_buffer (struct GNUNET_TESTBED_LOGGER_Handle *h) { struct GNUNET_MessageHeader *msg; - size_t msize; - - msize = sizeof (struct GNUNET_MessageHeader) + h->bs; - msg = GNUNET_realloc (h->buf, msize); - h->buf = NULL; - memmove (&msg[1], msg, h->bs); - h->bs = 0; - msg->type = htons (GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG); - msg->size = htons (msize); - queue_message (h, msg); + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg_extra (msg, + h->buse, + GNUNET_MESSAGE_TYPE_TESTBED_LOGGER_MSG); + memcpy (&msg[1], + h->buf, + h->buse); + h->bwrote += h->buse; + h->buse = 0; + h->mq_len++; + GNUNET_MQ_notify_sent (env, + ¬ify_sent, + h); + GNUNET_MQ_send (h->mq, + env); +} + + +/** + * We got disconnected from the logger. Stop logging. + * + * @param cls the `struct GNUNET_TESTBED_LOGGER_Handle` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TESTBED_LOGGER_Handle *h = cls; + + GNUNET_break (0); + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } @@ -331,13 +222,18 @@ struct GNUNET_TESTBED_LOGGER_Handle * GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { struct GNUNET_TESTBED_LOGGER_Handle *h; - struct GNUNET_CLIENT_Connection *client; - client = GNUNET_CLIENT_connect ("testbed-logger", cfg); - if (NULL == client) - return NULL; h = GNUNET_new (struct GNUNET_TESTBED_LOGGER_Handle); - h->client = client; + h->mq = GNUNET_CLIENT_connecT (cfg, + "testbed-logger", + NULL, + &mq_error_handler, + h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } return h; } @@ -350,23 +246,20 @@ GNUNET_TESTBED_LOGGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h) { - struct MessageQueue *mq; - unsigned int lost; - if (NULL != h->flush_completion_task) + { GNUNET_SCHEDULER_cancel (h->flush_completion_task); - lost = 0; - while (NULL != (mq = h->mq_head)) + h->flush_completion_task = NULL; + } + if (0 != h->mq_len) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Disconnect lost %u logger message[s]\n", + h->mq_len); + if (NULL != h->mq) { - GNUNET_CONTAINER_DLL_remove (h->mq_head, h->mq_tail, mq); - GNUNET_free (mq->msg); - GNUNET_free (mq); - lost++; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } - if (0 != lost) - LOG (GNUNET_ERROR_TYPE_WARNING, "Cleaning up %u unsent logger message[s]\n", - lost); - GNUNET_CLIENT_disconnect (h->client); GNUNET_free (h); } @@ -378,63 +271,28 @@ GNUNET_TESTBED_LOGGER_disconnect (struct GNUNET_TESTBED_LOGGER_Handle *h) * * @param h the logger handle * @param data the data to send; - * @param size how many bytes of data to send + * @param size how many bytes of @a data to send */ void GNUNET_TESTBED_LOGGER_write (struct GNUNET_TESTBED_LOGGER_Handle *h, - const void *data, size_t size) + const void *data, + size_t size) { - size_t fit_size; - - GNUNET_assert (0 != size); - GNUNET_assert (NULL != data); - GNUNET_assert (size <= (BUFFER_SIZE - sizeof (struct GNUNET_MessageHeader))); - fit_size = sizeof (struct GNUNET_MessageHeader) + h->bs + size; - if ( BUFFER_SIZE < fit_size ) - dispatch_buffer (h); - if (NULL == h->buf) - { - h->buf = GNUNET_malloc (size); - h->bs = size; - memcpy (h->buf, data, size); - goto dispatch_ready; - } - h->buf = GNUNET_realloc (h->buf, h->bs + size); - memcpy (h->buf + h->bs, data, size); - h->bs += size; - - dispatch_ready: - if (BUFFER_SIZE == fit_size) - dispatch_buffer (h); -} - - -/** - * Task to be executed when flushing our local buffer takes longer than timeout - * given to GNUNET_TESTBED_LOGGER_flush(). The flush completion callback will - * be called with 0 as the amount of data sent. - * - * @param cls the logger handle - */ -static void -timeout_flush (void *cls) -{ - struct GNUNET_TESTBED_LOGGER_Handle *h = cls; - GNUNET_TESTBED_LOGGER_FlushCompletion cb; - void *cb_cls; - - h->timeout_flush_task = NULL; - cb = h->cb; - h->cb = NULL; - cb_cls = h->cb_cls; - h->cb_cls = NULL; - if (NULL != h->flush_completion_task) + if (NULL == h->mq) + return; + while (0 != size) { - GNUNET_SCHEDULER_cancel (h->flush_completion_task); - h->flush_completion_task = NULL; + size_t fit_size = GNUNET_MIN (size, + BUFFER_SIZE - h->buse); + memcpy (&h->buf[h->buse], + data, + fit_size); + h->buse += fit_size; + data += fit_size; + size -= fit_size; + if (0 != size) + dispatch_buffer (h); } - if (NULL != cb) - cb (cb_cls, 0); } @@ -442,22 +300,19 @@ timeout_flush (void *cls) * Flush the buffered data to the logger service * * @param h the logger handle - * @param timeout how long to wait before calling the flust completion callback * @param cb the callback to call after the data is flushed * @param cb_cls the closure for the above callback */ void GNUNET_TESTBED_LOGGER_flush (struct GNUNET_TESTBED_LOGGER_Handle *h, - struct GNUNET_TIME_Relative timeout, GNUNET_TESTBED_LOGGER_FlushCompletion cb, void *cb_cls) { + GNUNET_assert (NULL == h->cb); h->cb = cb; h->cb_cls = cb_cls; - GNUNET_assert (NULL == h->timeout_flush_task); - h->timeout_flush_task = - GNUNET_SCHEDULER_add_delayed (timeout, &timeout_flush, h); - if (NULL == h->buf) + if ( (NULL == h->mq) || + (NULL == h->buf) ) { trigger_flush_notification (h); return; @@ -481,8 +336,6 @@ GNUNET_TESTBED_LOGGER_flush_cancel (struct GNUNET_TESTBED_LOGGER_Handle *h) GNUNET_SCHEDULER_cancel (h->flush_completion_task); h->flush_completion_task = NULL; } - if (NULL != h->timeout_flush_task) - cancel_timeout_flush (h); h->cb = NULL; h->cb_cls = NULL; }