X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Futil%2Fserver_mst.c;h=27c95815a40b53c088b2dab534e7830203ff8c39;hb=739ac84730e1d5f1b5affdeaedec11a986d2faa6;hp=53cae7035577c79d95db697b143de5c3ea61f44f;hpb=c7f9e2399e5d703f0137ebb0038173c56894b8e2;p=oweals%2Fgnunet.git diff --git a/src/util/server_mst.c b/src/util/server_mst.c index 53cae7035..27c95815a 100644 --- a/src/util/server_mst.c +++ b/src/util/server_mst.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009 Christian Grothoff (and other contributing authors) + (C) 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -31,6 +31,12 @@ #include "gnunet_server_lib.h" #include "gnunet_time_lib.h" +#if HAVE_UNALIGNED_64_ACCESS +#define ALIGN_FACTOR 4 +#else +#define ALIGN_FACTOR 8 +#endif + /** * Handle to a message stream tokenizer. @@ -38,18 +44,38 @@ struct GNUNET_SERVER_MessageStreamTokenizer { - size_t maxbuf; - - size_t off; + /** + * Function to call on completed messages. + */ + GNUNET_SERVER_MessageTokenizerCallback cb; + + /** + * Closure for cb. + */ + void *cb_cls; + /** + * Client to pass to cb. + */ void *client_identity; - GNUNET_SERVER_MessageTokenizerCallback cb; + /** + * Size of the buffer (starting at 'hdr'). + */ + size_t maxbuf; - void *cb_cls; + /** + * How many bytes in buffer have we already processed? + */ + size_t off; + + /** + * How many bytes in buffer are valid right now? + */ + size_t pos; /** - * Beginning of the buffer. + * Beginning of the buffer. Typed like this to force alignment. */ struct GNUNET_MessageHeader hdr; @@ -64,6 +90,8 @@ struct GNUNET_SERVER_MessageStreamTokenizer * GNUNET_SERVER_MAX_MESSAGE_SIZE) * @param client_identity ID of client for which this is a buffer, * can be NULL (will be passed back to 'cb') + * @param cb function to call on completed messages + * @param cb_cls closure for cb * @return handle to tokenizer */ struct GNUNET_SERVER_MessageStreamTokenizer * @@ -92,107 +120,158 @@ GNUNET_SERVER_mst_create (size_t maxbuf, * @param size number of bytes in buf * @param purge should any excess bytes in the buffer be discarded * (i.e. for packet-based services like UDP) - * @return GNUNET_NO if the data stream is corrupt - * GNUNET_SYSERR if the data stream is corrupt beyond repair + * @param one_shot only call callback once, keep rest of message in buffer + * @return GNUNET_OK if we are done processing (need more data) + * GNUNET_NO if one_shot was set and we have another message ready + * GNUNET_SYSERR if the data stream is corrupt */ int GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst, const char *buf, size_t size, - int purge) + int purge, + int one_shot) { const struct GNUNET_MessageHeader *hdr; size_t delta; - size_t want; + uint16_t want; char *ibuf; int need_align; unsigned long offset; + int ret; + ret = GNUNET_OK; ibuf = (char*) &mst->hdr; - if (mst->off > 0) + if (mst->pos > 0) { do_align: - if (mst->off < sizeof (struct GNUNET_MessageHeader)) + if ( (mst->maxbuf - mst->off < sizeof (struct GNUNET_MessageHeader)) || + (0 != (mst->off % ALIGN_FACTOR)) ) + { + /* need to align or need more space */ + mst->pos -= mst->off; + memmove (ibuf, + &ibuf[mst->off], + mst->pos); + mst->off = 0; + } + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) { - delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - mst->off, + delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off), size); - memcpy (&ibuf[mst->off], + memcpy (&ibuf[mst->pos], buf, delta); - mst->off += delta; + mst->pos += delta; buf += delta; size -= delta; } - if (mst->off < sizeof (struct GNUNET_MessageHeader)) + if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader)) { if (purge) - mst->off = 0; + { + mst->off = 0; + mst->pos = 0; + } return GNUNET_OK; } - want = ntohs (mst->hdr.size); + hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off]; + want = ntohs (hdr->size); if (want < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); - if (purge) - return GNUNET_NO; return GNUNET_SYSERR; } - if (want < mst->off) + if (mst->maxbuf - mst->off < want) { - delta = GNUNET_MIN (want - mst->off, + /* need more space */ + mst->pos -= mst->off; + memmove (ibuf, + &ibuf[mst->off], + mst->pos); + mst->off = 0; + } + if (mst->pos - mst->off < want) + { + delta = GNUNET_MIN (want - (mst->pos - mst->off), size); - memcpy (&ibuf[mst->off], + memcpy (&ibuf[mst->pos], buf, delta); - mst->off += delta; + mst->pos += delta; buf += delta; size -= delta; } - if (want < mst->off) + if (mst->pos - mst->off < want) { if (purge) - mst->off = 0; + { + mst->off = 0; + mst->pos = 0; + } return GNUNET_OK; } - mst->cb (mst->cb_cls, mst->client_identity, &mst->hdr); - mst->off = 0; + if (one_shot == GNUNET_SYSERR) + { + /* cannot call callback again, but return value saying that + we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } + if (one_shot == GNUNET_YES) + one_shot = GNUNET_SYSERR; + mst->cb (mst->cb_cls, mst->client_identity, hdr); + mst->off += want; + if (mst->off == mst->pos) + { + /* reset to beginning of buffer, it's free right now! */ + mst->off = 0; + mst->pos = 0; + } } while (size > 0) { if (size < sizeof (struct GNUNET_MessageHeader)) break; offset = (unsigned long) buf; -#if HAVE_UNALIGNED_64_ACCESS - need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO; -#else - need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO; -#endif + need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO; if (GNUNET_NO == need_align) { - /* can try to do zero-copy */ + /* can try to do zero-copy and process directly from original buffer */ hdr = (const struct GNUNET_MessageHeader *) buf; want = ntohs (hdr->size); if (size < want) - break; /* or not, buffer incomplete... */ + break; /* or not, buffer incomplete, so copy to private buffer... */ + if (one_shot == GNUNET_SYSERR) + { + /* cannot call callback again, but return value saying that + we have another full message in the buffer */ + ret = GNUNET_NO; + goto copy; + } + if (one_shot == GNUNET_YES) + one_shot = GNUNET_SYSERR; mst->cb (mst->cb_cls, mst->client_identity, hdr); buf += want; size -= want; } else { - /* yes, we go a bit more spagetti than usual here */ + /* need to copy to private buffer to align; + yes, we go a bit more spagetti than usual here */ goto do_align; } } + copy: if ( (size > 0) && (! purge) ) { - memcpy (&mst->hdr, buf, size); - mst->off = size; - size = 0; + GNUNET_assert (mst->pos + size <= mst->maxbuf); + memcpy (&ibuf[mst->pos], buf, size); + mst->pos += size; } if (purge) mst->off = 0; - return GNUNET_OK; + return ret; }