X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fserver_mst.c;h=0a686a079e6323e44ef949a774fd4c6ef991c21c;hb=60de5f48cbfc3868570284e91415ca7e06c390e1;hp=8df4e4b7d3af2faff255e8c4ec15716675bb899c;hpb=09082d4872f060aedf1364fb5ebc1584ee4c47d6;p=oweals%2Fgnunet.git diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 8df4e4b7d..0a686a079 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c @@ -1,10 +1,10 @@ /* This file is part of GNUnet. - (C) 2009 Christian Grothoff (and other contributing authors) + Copyright (C) 2010 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your + by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but @@ -14,8 +14,8 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** @@ -25,11 +25,16 @@ */ #include "platform.h" -#include "gnunet_common.h" -#include "gnunet_connection_lib.h" -#include "gnunet_scheduler_lib.h" -#include "gnunet_server_lib.h" -#include "gnunet_time_lib.h" +#include "gnunet_util_lib.h" + + +#if HAVE_UNALIGNED_64_ACCESS +#define ALIGN_FACTOR 4 +#else +#define ALIGN_FACTOR 8 +#endif + +#define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__) /** @@ -38,20 +43,35 @@ struct GNUNET_SERVER_MessageStreamTokenizer { - size_t maxbuf; + /** + * Function to call on completed messages. + */ + GNUNET_SERVER_MessageTokenizerCallback cb; - size_t off; + /** + * Closure for cb. + */ + void *cb_cls; - void *client_identity; + /** + * Size of the buffer (starting at 'hdr'). + */ + size_t curr_buf; - GNUNET_SERVER_MessageTokenizerCallback cb; + /** + * How many bytes in buffer have we already processed? + */ + size_t off; - void *cb_cls; + /** + * How many bytes in buffer are valid right now? + */ + size_t pos; /** - * Beginning of the buffer. + * Beginning of the buffer. Typed like this to force alignment. */ - struct GNUNET_MessageHeader hdr; + struct GNUNET_MessageHeader *hdr; }; @@ -60,23 +80,19 @@ struct GNUNET_SERVER_MessageStreamTokenizer /** * Create a message stream tokenizer. * - * @param maxbuf maximum message size to support (typically - * GNUNET_SERVER_MAX_MESSAGE_SIZE) - * @param client_identity ID of client for which this is a buffer, - * can be NULL (will be passed back to 'cb') + * @param cb function to call on completed messages + * @param cb_cls closure for @a cb * @return handle to tokenizer */ struct GNUNET_SERVER_MessageStreamTokenizer * -GNUNET_SERVER_mst_create (size_t maxbuf, - void *client_identity, - GNUNET_SERVER_MessageTokenizerCallback cb, - void *cb_cls) +GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb, + void *cb_cls) { struct GNUNET_SERVER_MessageStreamTokenizer *ret; - ret = GNUNET_malloc (maxbuf + sizeof (struct GNUNET_SERVER_MessageStreamTokenizer)); - ret->maxbuf = maxbuf; - ret->client_identity = client_identity; + ret = GNUNET_new (struct GNUNET_SERVER_MessageStreamTokenizer); + ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE); + ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE; ret->cb = cb; ret->cb_cls = cb_cls; return ret; @@ -88,128 +104,194 @@ GNUNET_SERVER_mst_create (size_t maxbuf, * callback for all complete messages. * * @param mst tokenizer to use + * @param client_identity ID of client for which this is a buffer * @param buf input data to add - * @param size number of bytes in buf - * @param purge should any excess bytes in the buffer be discarded + * @param size number of bytes in @a buf + * @param purge should any excess bytes in the buffer be discarded * (i.e. for packet-based services like UDP) * @param one_shot only call callback once, keep rest of message in buffer - * @return GNUNET_OK if we are done processing (need more data) - * GNUNET_NO if one_shot was set and we have another message ready - * GNUNET_SYSERR if the data stream is corrupt + * @return #GNUNET_OK if we are done processing (need more data) + * #GNUNET_NO if @a one_shot was set and we have another message ready + * #GNUNET_SYSERR if the data stream is corrupt */ int GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, - const char *buf, - size_t size, - int purge, - int one_shot) + void *client_identity, + const char *buf, size_t size, + int purge, int one_shot) { const struct GNUNET_MessageHeader *hdr; size_t delta; - size_t want; + uint16_t want; char *ibuf; int need_align; unsigned long offset; int ret; + GNUNET_assert (mst->off <= mst->pos); + GNUNET_assert (mst->pos <= mst->curr_buf); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst receives %u bytes with %u bytes already in private buffer\n", + (unsigned int) size, (unsigned int) (mst->pos - mst->off)); ret = GNUNET_OK; - ibuf = (char*) &mst->hdr; - if (mst->off > 0) + ibuf = (char *) mst->hdr; + while (mst->pos > 0) + { +do_align: + GNUNET_assert (mst->pos >= mst->off); + if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) || + (0 != (mst->off % ALIGN_FACTOR))) + { + /* need to align or need more space */ + mst->pos -= mst->off; + memmove (ibuf, &ibuf[mst->off], mst->pos); + mst->off = 0; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) { - do_align: - if (mst->off < sizeof (struct GNUNET_MessageHeader)) - { - delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - mst->off, - size); - memcpy (&ibuf[mst->off], - buf, - delta); - mst->off += delta; - buf += delta; - size -= delta; - } - if (mst->off < sizeof (struct GNUNET_MessageHeader)) - { - if (purge) - mst->off = 0; - return GNUNET_OK; - } - want = ntohs (mst->hdr.size); + delta = + GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - + (mst->pos - mst->off), size); + memcpy (&ibuf[mst->pos], buf, delta); + mst->pos += delta; + buf += delta; + size -= delta; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) + { + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + return GNUNET_OK; + } + hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; + want = ntohs (hdr->size); + if (want < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + if ( (mst->curr_buf - mst->off < want) && + (mst->off > 0) ) + { + /* can get more space by moving */ + mst->pos -= mst->off; + memmove (ibuf, &ibuf[mst->off], mst->pos); + mst->off = 0; + } + if (mst->curr_buf < want) + { + /* need to get more space by growing buffer */ + GNUNET_assert (0 == mst->off); + mst->hdr = GNUNET_realloc (mst->hdr, want); + ibuf = (char *) mst->hdr; + mst->curr_buf = want; + } + hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off]; + if (mst->pos - mst->off < want) + { + delta = GNUNET_MIN (want - (mst->pos - mst->off), size); + GNUNET_assert (mst->pos + delta <= mst->curr_buf); + memcpy (&ibuf[mst->pos], buf, delta); + mst->pos += delta; + buf += delta; + size -= delta; + } + if (mst->pos - mst->off < want) + { + if (purge) + { + mst->off = 0; + mst->pos = 0; + } + return GNUNET_OK; + } + if (one_shot == GNUNET_SYSERR) + { + /* cannot call callback again, but return value saying that + * we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } + if (one_shot == GNUNET_YES) + one_shot = GNUNET_SYSERR; + mst->off += want; + if (GNUNET_SYSERR == mst->cb (mst->cb_cls, client_identity, hdr)) + return GNUNET_SYSERR; + if (mst->off == mst->pos) + { + /* reset to beginning of buffer, it's free right now! */ + mst->off = 0; + mst->pos = 0; + } + } + GNUNET_assert (0 == mst->pos); + while (size > 0) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst has %u bytes left in inbound buffer\n", + (unsigned int) size); + if (size < sizeof (struct GNUNET_MessageHeader)) + break; + offset = (unsigned long) buf; + need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO; + if (GNUNET_NO == need_align) + { + /* can try to do zero-copy and process directly from original buffer */ + hdr = (const struct GNUNET_MessageHeader *) buf; + want = ntohs (hdr->size); if (want < sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (want < mst->off) - { - delta = GNUNET_MIN (want - mst->off, - size); - memcpy (&ibuf[mst->off], - buf, - delta); - mst->off += delta; - buf += delta; - size -= delta; - } - if (want < mst->off) - { - if (purge) - mst->off = 0; - return GNUNET_OK; - } + { + GNUNET_break_op (0); + mst->off = 0; + return GNUNET_SYSERR; + } + if (size < want) + break; /* or not: buffer incomplete, so copy to private buffer... */ if (one_shot == GNUNET_SYSERR) - { - ret = GNUNET_NO; - goto copy; - } + { + /* cannot call callback again, but return value saying that + * we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } if (one_shot == GNUNET_YES) - one_shot = GNUNET_SYSERR; - mst->cb (mst->cb_cls, mst->client_identity, &mst->hdr); - mst->off = 0; + one_shot = GNUNET_SYSERR; + if (GNUNET_SYSERR == mst->cb (mst->cb_cls, client_identity, hdr)) + return GNUNET_SYSERR; + buf += want; + size -= want; } - while (size > 0) + else { - if (size < sizeof (struct GNUNET_MessageHeader)) - break; - offset = (unsigned long) buf; -#if HAVE_UNALIGNED_64_ACCESS - need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO; -#else - need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO; -#endif - if (GNUNET_NO == need_align) - { - /* can try to do zero-copy */ - hdr = (const struct GNUNET_MessageHeader *) buf; - want = ntohs (hdr->size); - if (size < want) - break; /* or not, buffer incomplete... */ - if (one_shot == GNUNET_SYSERR) - { - ret = GNUNET_NO; - goto copy; - } - if (one_shot == GNUNET_YES) - one_shot = GNUNET_SYSERR; - mst->cb (mst->cb_cls, mst->client_identity, hdr); - buf += want; - size -= want; - } - else - { - /* yes, we go a bit more spagetti than usual here */ - goto do_align; - } + /* need to copy to private buffer to align; + * yes, we go a bit more spagetti than usual here */ + goto do_align; } - copy: - if ( (size > 0) && (! purge) ) + } +copy: + if ((size > 0) && (!purge)) + { + if (size + mst->pos > mst->curr_buf) { - memcpy (&ibuf[mst->off], buf, size); - mst->off += size; - size = 0; + mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos); + ibuf = (char *) mst->hdr; + mst->curr_buf = size + mst->pos; } + GNUNET_assert (size + mst->pos <= mst->curr_buf); + memcpy (&ibuf[mst->pos], buf, size); + mst->pos += size; + } if (purge) - mst->off = 0; + { + mst->off = 0; + mst->pos = 0; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Server-mst leaves %u bytes in private buffer\n", + (unsigned int) (mst->pos - mst->off)); return ret; } @@ -222,6 +304,7 @@ GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, void GNUNET_SERVER_mst_destroy (struct GNUNET_SERVER_MessageStreamTokenizer *mst) { + GNUNET_free (mst->hdr); GNUNET_free (mst); }