From 19e8c3f5f9817643dc6aea505abec3ed40238b3d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 29 Aug 2016 13:29:04 +0000 Subject: [PATCH] -towards a new MST for the new service --- src/include/Makefile.am | 1 + src/include/gnunet_mst_lib.h | 162 +++++++++++++++ src/include/gnunet_util_lib.h | 1 + src/util/Makefile.am | 1 + src/util/mst.c | 363 ++++++++++++++++++++++++++++++++++ src/util/service_new.c | 23 +-- 6 files changed, 531 insertions(+), 20 deletions(-) create mode 100644 src/include/gnunet_mst_lib.h create mode 100644 src/util/mst.c diff --git a/src/include/Makefile.am b/src/include/Makefile.am index 45cb6e5c6..866a90bef 100644 --- a/src/include/Makefile.am +++ b/src/include/Makefile.am @@ -71,6 +71,7 @@ gnunetinclude_HEADERS = \ gnunet_cadet_service.h \ gnunet_microphone_lib.h \ gnunet_multicast_service.h \ + gnunet_mst_lib.h \ gnunet_mq_lib.h \ gnunet_my_lib.h \ gnunet_mysql_lib.h \ diff --git a/src/include/gnunet_mst_lib.h b/src/include/gnunet_mst_lib.h new file mode 100644 index 000000000..7a1ca7a55 --- /dev/null +++ b/src/include/gnunet_mst_lib.h @@ -0,0 +1,162 @@ +/* + This file is part of GNUnet. + Copyright (C) 2009-2013, 2016 GNUnet e.V. + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ + +/** + * @author Christian Grothoff + * + * @file + * Library for tokenizing a message stream + + * @defgroup server Server library + * Library for tokenizing a message stream + * + * @see [Documentation](https://gnunet.org/mst) + * + * @{ + */ + +#ifndef GNUNET_MST_LIB_H +#define GNUNET_MST_LIB_H + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + +#include "gnunet_common.h" + + +/** + * Handle to a message stream tokenizer. + */ +struct GNUNET_MessageStreamTokenizer; + + +/** + * Functions with this signature are called whenever a + * complete message is received by the tokenizer. + * + * Do not call #GNUNET_mst_destroy from within + * the scope of this callback. + * + * @param cls closure + * @param message the actual message + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + */ +typedef int +(*GNUNET_MessageTokenizerCallback) (void *cls, + const struct GNUNET_MessageHeader *message); + + +/** + * Create a message stream tokenizer. + * + * @param cb function to call on completed messages + * @param cb_cls closure for @a cb + * @return handle to tokenizer + */ +struct GNUNET_MessageStreamTokenizer * +GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb, + void *cb_cls); + + +/** + * Add incoming data to the receive buffer and call the + * callback for all complete messages. + * + * @param mst tokenizer to use + * @param buf input data to add + * @param size number of bytes in @a buf + * @param purge should any excess bytes in the buffer be discarded + * (i.e. for packet-based services like UDP) + * @param one_shot only call callback once, keep rest of message in buffer + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt + */ +int +GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst, + const char *buf, + size_t size, + int purge, + int one_shot); + + +/** + * Add incoming data to the receive buffer and call the + * callback for all complete messages. + * + * @param mst tokenizer to use + * @param buf input data to add + * @param size number of bytes in @a buf + * @param purge should any excess bytes in the buffer be discarded + * (i.e. for packet-based services like UDP) + * @param one_shot only call callback once, keep rest of message in buffer + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt + */ +int +GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst, + struct GNUNET_NETWORK_Handle *sock, + int purge, + int one_shot); + + +/** + * Obtain the next message from the @a mst, assuming that + * there are more unprocessed messages in the internal buffer + * of the @a mst. + * + * @param mst tokenizer to use + * @param one_shot only call callback once, keep rest of message in buffer + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt + */ +int +GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst, + int one_shot); + + +/** + * Destroys a tokenizer. + * + * @param mst tokenizer to destroy + */ +void +GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +#endif + +/** @} */ /* end of group server */ + +/* end of gnunet_mst_lib.h */ diff --git a/src/include/gnunet_util_lib.h b/src/include/gnunet_util_lib.h index 599a04c0f..f0d964296 100644 --- a/src/include/gnunet_util_lib.h +++ b/src/include/gnunet_util_lib.h @@ -47,6 +47,7 @@ extern "C" #include "gnunet_container_lib.h" #include "gnunet_getopt_lib.h" #include "gnunet_helper_lib.h" +#include "gnunet_mst_lib.h" #include "gnunet_mq_lib.h" #include "gnunet_op_lib.h" #include "gnunet_os_lib.h" diff --git a/src/util/Makefile.am b/src/util/Makefile.am index d6c93cefb..6822d539d 100644 --- a/src/util/Makefile.am +++ b/src/util/Makefile.am @@ -92,6 +92,7 @@ libgnunetutil_la_SOURCES = \ getopt_helpers.c \ helper.c \ load.c \ + mst.c \ mq.c \ network.c \ op.c \ diff --git a/src/util/mst.c b/src/util/mst.c new file mode 100644 index 000000000..578ba8e04 --- /dev/null +++ b/src/util/mst.c @@ -0,0 +1,363 @@ +/* + This file is part of GNUnet. + Copyright (C) 2010, 2016 GNUnet e.V. + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ + +/** + * @file util/mst.c + * @brief convenience functions for handling inbound message buffers + * @author Christian Grothoff + */ + +#include "platform.h" +#include "gnunet_util_lib.h" + + +#if HAVE_UNALIGNED_64_ACCESS +#define ALIGN_FACTOR 4 +#else +#define ALIGN_FACTOR 8 +#endif + +#define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__) + + +/** + * Handle to a message stream tokenizer. + */ +struct GNUNET_MessageStreamTokenizer +{ + + /** + * Function to call on completed messages. + */ + GNUNET_MessageTokenizerCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Size of the buffer (starting at @e hdr). + */ + size_t curr_buf; + + /** + * How many bytes in buffer have we already processed? + */ + size_t off; + + /** + * How many bytes in buffer are valid right now? + */ + size_t pos; + + /** + * Beginning of the buffer. Typed like this to force alignment. + */ + struct GNUNET_MessageHeader *hdr; + +}; + + +/** + * Create a message stream tokenizer. + * + * @param cb function to call on completed messages + * @param cb_cls closure for @a cb + * @return handle to tokenizer + */ +struct GNUNET_MessageStreamTokenizer * +GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb, + void *cb_cls) +{ + struct GNUNET_MessageStreamTokenizer *ret; + + ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer); + ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE); + ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE; + ret->cb = cb; + ret->cb_cls = cb_cls; + return ret; +} + + +/** + * Add incoming data to the receive buffer and call the + * callback for all complete messages. + * + * @param mst tokenizer to use + * @param buf input data to add + * @param size number of bytes in @a buf + * @param purge should any excess bytes in the buffer be discarded + * (i.e. for packet-based services like UDP) + * @param one_shot only call callback once, keep rest of message in buffer + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if @a one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt + */ +int +GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst, + const char *buf, + size_t size, + int purge, + int one_shot) +{ + const struct GNUNET_MessageHeader *hdr; + size_t delta; + uint16_t want; + char *ibuf; + int need_align; + unsigned long offset; + int ret; + + GNUNET_assert (mst->off <= mst->pos); + GNUNET_assert (mst->pos <= mst->curr_buf); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst receives %u bytes with %u bytes already in private buffer\n", + (unsigned int) size, + (unsigned int) (mst->pos - mst->off)); + ret = GNUNET_OK; + ibuf = (char *) mst->hdr; + while (mst->pos > 0) + { +do_align: + GNUNET_assert (mst->pos >= mst->off); + if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || + (0 != (mst->off % ALIGN_FACTOR))) + { + /* need to align or need more space */ + mst->pos -= mst->off; + memmove (ibuf, &ibuf[mst->off], mst->pos); + mst->off = 0; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) + { + delta = + GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - + (mst->pos - mst->off), size); + GNUNET_memcpy (&ibuf[mst->pos], buf, delta); + mst->pos += delta; + buf += delta; + size -= delta; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) + { + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + return GNUNET_OK; + } + hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; + want = ntohs (hdr->size); + if (want < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ( (mst->curr_buf - mst->off < want) && + (mst->off > 0) ) + { + /* can get more space by moving */ + mst->pos -= mst->off; + memmove (ibuf, &ibuf[mst->off], mst->pos); + mst->off = 0; + } + if (mst->curr_buf < want) + { + /* need to get more space by growing buffer */ + GNUNET_assert (0 == mst->off); + mst->hdr = GNUNET_realloc (mst->hdr, want); + ibuf = (char *) mst->hdr; + mst->curr_buf = want; + } + hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; + if (mst->pos - mst->off < want) + { + delta = GNUNET_MIN (want - (mst->pos - mst->off), size); + GNUNET_assert (mst->pos + delta <= mst->curr_buf); + GNUNET_memcpy (&ibuf[mst->pos], buf, delta); + mst->pos += delta; + buf += delta; + size -= delta; + } + if (mst->pos - mst->off < want) + { + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + return GNUNET_OK; + } + if (one_shot == GNUNET_SYSERR) + { + /* cannot call callback again, but return value saying that + * we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } + if (one_shot == GNUNET_YES) + one_shot = GNUNET_SYSERR; + mst->off += want; + if (GNUNET_SYSERR == mst->cb (mst->cb_cls, + hdr)) + return GNUNET_SYSERR; + if (mst->off == mst->pos) + { + /* reset to beginning of buffer, it's free right now! */ + mst->off = 0; + mst->pos = 0; + } + } + GNUNET_assert (0 == mst->pos); + while (size > 0) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst has %u bytes left in inbound buffer\n", + (unsigned int) size); + if (size < sizeof (struct GNUNET_MessageHeader)) + break; + offset = (unsigned long) buf; + need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO; + if (GNUNET_NO == need_align) + { + /* can try to do zero-copy and process directly from original buffer */ + hdr = (const struct GNUNET_MessageHeader *) buf; + want = ntohs (hdr->size); + if (want < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + mst->off = 0; + return GNUNET_SYSERR; + } + if (size < want) + break; /* or not: buffer incomplete, so copy to private buffer... */ + if (one_shot == GNUNET_SYSERR) + { + /* cannot call callback again, but return value saying that + * we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } + if (one_shot == GNUNET_YES) + one_shot = GNUNET_SYSERR; + if (GNUNET_SYSERR == mst->cb (mst->cb_cls, + hdr)) + return GNUNET_SYSERR; + buf += want; + size -= want; + } + else + { + /* need to copy to private buffer to align; + * yes, we go a bit more spagetti than usual here */ + goto do_align; + } + } +copy: + if ((size > 0) && (!purge)) + { + if (size + mst->pos > mst->curr_buf) + { + mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); + ibuf = (char *) mst->hdr; + mst->curr_buf = size + mst->pos; + } + GNUNET_assert (size + mst->pos <= mst->curr_buf); + GNUNET_memcpy (&ibuf[mst->pos], buf, size); + mst->pos += size; + } + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst leaves %u bytes in private buffer\n", + (unsigned int) (mst->pos - mst->off)); + return ret; +} + + +/** + * Add incoming data to the receive buffer and call the + * callback for all complete messages. + * + * @param mst tokenizer to use + * @param buf input data to add + * @param size number of bytes in @a buf + * @param purge should any excess bytes in the buffer be discarded + * (i.e. for packet-based services like UDP) + * @param one_shot only call callback once, keep rest of message in buffer + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt + */ +int +GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst, + struct GNUNET_NETWORK_Handle *sock, + int purge, + int one_shot) +{ + GNUNET_assert (0); // not implemented + return GNUNET_SYSERR; +} + + +/** + * Obtain the next message from the @a mst, assuming that + * there are more unprocessed messages in the internal buffer + * of the @a mst. + * + * @param mst tokenizer to use + * @param one_shot only call callback once, keep rest of message in buffer + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt + */ +int +GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst, + int one_shot) +{ + return GNUNET_MST_from_buffer (mst, + NULL, + 0, + GNUNET_NO, + one_shot); +} + + +/** + * Destroys a tokenizer. + * + * @param mst tokenizer to destroy + */ +void +GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst) +{ + GNUNET_free (mst->hdr); + GNUNET_free (mst); +} + + + +/* end of server_mst.c */ diff --git a/src/util/service_new.c b/src/util/service_new.c index fc47289f6..34f96a598 100644 --- a/src/util/service_new.c +++ b/src/util/service_new.c @@ -446,24 +446,6 @@ service_mq_send (struct GNUNET_MQ_Handle *mq, } -/** - * Implements the destruction of a message queue. Implementations - * must not free @a mq, but should take care of @a impl_state. - * Not sure there is anything to do here! (FIXME!) - * - * @param mq the message queue to destroy - * @param impl_state state of the implementation - */ -static void -service_mq_destroy (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct GNUNET_SERVICE_Client *client = cls; - - // FIXME -} - - /** * Implementation function that cancels the currently sent message. * @@ -476,7 +458,8 @@ service_mq_cancel (struct GNUNET_MQ_Handle *mq, { struct GNUNET_SERVICE_Client *client = cls; - // FIXME: semantics? What to do!? + // FIXME: stop transmission! (must be possible, otherwise + // we must have told MQ that the message was sent!) } @@ -561,7 +544,7 @@ start_client (struct GNUNET_SERVICE_Handle *sh, client->sh = sh; client->sock = csock; client->mq = GNUNET_MQ_queue_for_callbacks (&service_mq_send, - &service_mq_destroy, + NULL, &service_mq_cancel, client, sh->handlers, -- 2.25.1