/*
This file is part of GNUnet.
- Copyright (C) 2010, 2016 GNUnet e.V.
+ Copyright (C) 2010, 2016, 2017 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
+ Affero General Public License for more details.
*/
/**
#define ALIGN_FACTOR 8
#endif
-#define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
+#define LOG(kind,...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
/**
struct GNUNET_MessageStreamTokenizer *ret;
ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
- ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE);
- ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE;
+ ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
+ ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
ret->cb = cb;
ret->cb_cls = cb_cls;
return ret;
int need_align;
unsigned long offset;
int ret;
+ int cbret;
GNUNET_assert (mst->off <= mst->pos);
GNUNET_assert (mst->pos <= mst->curr_buf);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Server-mst receives %u bytes with %u bytes already in private buffer\n",
+ "MST receives %u bytes with %u bytes already in private buffer\n",
(unsigned int) size,
(unsigned int) (mst->pos - mst->off));
ret = GNUNET_OK;
{
/* need to align or need more space */
mst->pos -= mst->off;
- memmove (ibuf, &ibuf[mst->off], mst->pos);
+ memmove (ibuf,
+ &ibuf[mst->off],
+ mst->pos);
mst->off = 0;
}
if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
{
- delta =
- GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
- (mst->pos - mst->off), size);
- GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
+ delta
+ = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
+ - (mst->pos - mst->off),
+ size);
+ GNUNET_memcpy (&ibuf[mst->pos],
+ buf,
+ delta);
mst->pos += delta;
buf += delta;
size -= delta;
{
/* can get more space by moving */
mst->pos -= mst->off;
- memmove (ibuf, &ibuf[mst->off], mst->pos);
+ memmove (ibuf,
+ &ibuf[mst->off],
+ mst->pos);
mst->off = 0;
}
if (mst->curr_buf < want)
{
/* need to get more space by growing buffer */
GNUNET_assert (0 == mst->off);
- mst->hdr = GNUNET_realloc (mst->hdr, want);
+ mst->hdr = GNUNET_realloc (mst->hdr,
+ want);
ibuf = (char *) mst->hdr;
mst->curr_buf = want;
}
hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
if (mst->pos - mst->off < want)
{
- delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
+ delta = GNUNET_MIN (want - (mst->pos - mst->off),
+ size);
GNUNET_assert (mst->pos + delta <= mst->curr_buf);
- GNUNET_memcpy (&ibuf[mst->pos], buf, delta);
+ GNUNET_memcpy (&ibuf[mst->pos],
+ buf,
+ delta);
mst->pos += delta;
buf += delta;
size -= delta;
if (one_shot == GNUNET_YES)
one_shot = GNUNET_SYSERR;
mst->off += want;
- if (GNUNET_SYSERR == mst->cb (mst->cb_cls,
- hdr))
+ if (GNUNET_OK !=
+ (cbret = mst->cb (mst->cb_cls,
+ hdr)))
+ {
+ if (GNUNET_SYSERR == cbret)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failure processing message of type %u and size %u\n",
+ ntohs (hdr->type),
+ ntohs (hdr->size));
return GNUNET_SYSERR;
+ }
if (mst->off == mst->pos)
{
/* reset to beginning of buffer, it's free right now! */
}
if (one_shot == GNUNET_YES)
one_shot = GNUNET_SYSERR;
- if (GNUNET_SYSERR == mst->cb (mst->cb_cls,
- hdr))
+ if (GNUNET_OK !=
+ (cbret = mst->cb (mst->cb_cls,
+ hdr)))
+ {
+ if (GNUNET_SYSERR == cbret)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failure processing message of type %u and size %u\n",
+ ntohs (hdr->type),
+ ntohs (hdr->size));
return GNUNET_SYSERR;
+ }
buf += want;
size -= want;
}
{
if (size + mst->pos > mst->curr_buf)
{
- mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
+ mst->hdr = GNUNET_realloc (mst->hdr,
+ size + mst->pos);
ibuf = (char *) mst->hdr;
mst->curr_buf = size + mst->pos;
}
GNUNET_assert (size + mst->pos <= mst->curr_buf);
- GNUNET_memcpy (&ibuf[mst->pos], buf, size);
+ GNUNET_memcpy (&ibuf[mst->pos],
+ buf,
+ size);
mst->pos += size;
}
if (purge)
int purge,
int one_shot)
{
- GNUNET_assert (0); // not implemented
- return GNUNET_SYSERR;
+ ssize_t ret;
+ size_t left;
+ char *buf;
+
+ left = mst->curr_buf - mst->pos;
+ buf = (char *) mst->hdr;
+ ret = GNUNET_NETWORK_socket_recv (sock,
+ &buf[mst->pos],
+ left);
+ if (-1 == ret)
+ {
+ if ( (EAGAIN == errno) ||
+ (EINTR == errno) )
+ return GNUNET_OK;
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
+ "recv");
+ return GNUNET_SYSERR;
+ }
+ if (0 == ret)
+ {
+ /* other side closed connection, treat as error */
+ return GNUNET_SYSERR;
+ }
+ mst->pos += ret;
+ return GNUNET_MST_from_buffer (mst,
+ NULL,
+ 0,
+ purge,
+ one_shot);
}