Fixed one serious bug, working on another. Still very broken.
[oweals/gnunet.git] / src / util / server_mst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/server_mst.c
23  * @brief convenience functions for handling inbound message buffers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
38
39
40 /**
41  * Handle to a message stream tokenizer.
42  */
43 struct GNUNET_SERVER_MessageStreamTokenizer
44 {
45
46   /**
47    * Function to call on completed messages.
48    */
49   GNUNET_SERVER_MessageTokenizerCallback cb;
50
51   /**
52    * Closure for cb.
53    */
54   void *cb_cls;
55
56   /**
57    * Size of the buffer (starting at 'hdr').
58    */
59   size_t curr_buf;
60
61   /**
62    * How many bytes in buffer have we already processed?
63    */
64   size_t off;
65
66   /**
67    * How many bytes in buffer are valid right now?
68    */
69   size_t pos;
70
71   /**
72    * Beginning of the buffer.  Typed like this to force alignment.
73    */
74   struct GNUNET_MessageHeader *hdr;
75
76 };
77
78
79
80 /**
81  * Create a message stream tokenizer.
82  *
83  * @param cb function to call on completed messages
84  * @param cb_cls closure for @a cb
85  * @return handle to tokenizer
86  */
87 struct GNUNET_SERVER_MessageStreamTokenizer *
88 GNUNET_SERVER_mst_create (GNUNET_SERVER_MessageTokenizerCallback cb,
89                           void *cb_cls)
90 {
91   struct GNUNET_SERVER_MessageStreamTokenizer *ret;
92
93   ret = GNUNET_new (struct GNUNET_SERVER_MessageStreamTokenizer);
94   ret->hdr = GNUNET_malloc (GNUNET_SERVER_MIN_BUFFER_SIZE);
95   ret->curr_buf = GNUNET_SERVER_MIN_BUFFER_SIZE;
96   ret->cb = cb;
97   ret->cb_cls = cb_cls;
98   return ret;
99 }
100
101
102 /**
103  * Add incoming data to the receive buffer and call the
104  * callback for all complete messages.
105  *
106  * @param mst tokenizer to use
107  * @param client_identity ID of client for which this is a buffer
108  * @param buf input data to add
109  * @param size number of bytes in @a buf
110  * @param purge should any excess bytes in the buffer be discarded
111  *       (i.e. for packet-based services like UDP)
112  * @param one_shot only call callback once, keep rest of message in buffer
113  * @return #GNUNET_OK if we are done processing (need more data)
114  *         #GNUNET_NO if @a one_shot was set and we have another message ready
115  *         #GNUNET_SYSERR if the data stream is corrupt
116  */
117 int
118 GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
119                            void *client_identity,
120                            const char *buf, size_t size,
121                            int purge, int one_shot)
122 {
123   const struct GNUNET_MessageHeader *hdr;
124   size_t delta;
125   uint16_t want;
126   char *ibuf;
127   int need_align;
128   unsigned long offset;
129   int ret;
130
131   GNUNET_assert (mst->off <= mst->pos);
132   GNUNET_assert (mst->pos <= mst->curr_buf);
133   LOG (GNUNET_ERROR_TYPE_DEBUG,
134        "Server-mst receives %u bytes with %u bytes already in private buffer\n",
135        (unsigned int) size, (unsigned int) (mst->pos - mst->off));
136   ret = GNUNET_OK;
137   ibuf = (char *) mst->hdr;
138   while (mst->pos > 0)
139   {
140 do_align:
141     GNUNET_assert (mst->pos >= mst->off);
142     if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
143         (0 != (mst->off % ALIGN_FACTOR)))
144     {
145       /* need to align or need more space */
146       mst->pos -= mst->off;
147       memmove (ibuf, &ibuf[mst->off], mst->pos);
148       mst->off = 0;
149     }
150     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
151     {
152       delta =
153           GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) -
154                       (mst->pos - mst->off), size);
155       memcpy (&ibuf[mst->pos], buf, delta);
156       mst->pos += delta;
157       buf += delta;
158       size -= delta;
159     }
160     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
161     {
162       if (purge)
163       {
164         mst->off = 0;
165         mst->pos = 0;
166       }
167       return GNUNET_OK;
168     }
169     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
170     want = ntohs (hdr->size);
171     if (want < sizeof (struct GNUNET_MessageHeader))
172     {
173       GNUNET_break_op (0);
174       return GNUNET_SYSERR;
175     }
176     if ( (mst->curr_buf - mst->off < want) &&
177          (mst->off > 0) )
178     {
179       /* can get more space by moving */
180       mst->pos -= mst->off;
181       memmove (ibuf, &ibuf[mst->off], mst->pos);
182       mst->off = 0;
183     }
184     if (mst->curr_buf < want)
185     {
186       /* need to get more space by growing buffer */
187       GNUNET_assert (0 == mst->off);
188       mst->hdr = GNUNET_realloc (mst->hdr, want);
189       ibuf = (char *) mst->hdr;
190       mst->curr_buf = want;
191     }
192     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
193     if (mst->pos - mst->off < want)
194     {
195       delta = GNUNET_MIN (want - (mst->pos - mst->off), size);
196       GNUNET_assert (mst->pos + delta <= mst->curr_buf);
197       memcpy (&ibuf[mst->pos], buf, delta);
198       mst->pos += delta;
199       buf += delta;
200       size -= delta;
201     }
202     if (mst->pos - mst->off < want)
203     {
204       if (purge)
205       {
206         mst->off = 0;
207         mst->pos = 0;
208       }
209       return GNUNET_OK;
210     }
211     if (one_shot == GNUNET_SYSERR)
212     {
213       /* cannot call callback again, but return value saying that
214        * we have another full message in the buffer */
215       ret = GNUNET_NO;
216       goto copy;
217     }
218     if (one_shot == GNUNET_YES)
219       one_shot = GNUNET_SYSERR;
220     mst->off += want;
221     if (GNUNET_SYSERR == mst->cb (mst->cb_cls, client_identity, hdr))
222       return GNUNET_SYSERR;
223     if (mst->off == mst->pos)
224     {
225       /* reset to beginning of buffer, it's free right now! */
226       mst->off = 0;
227       mst->pos = 0;
228     }
229   }
230   GNUNET_assert (0 == mst->pos);
231   while (size > 0)
232   {
233     LOG (GNUNET_ERROR_TYPE_DEBUG,
234          "Server-mst has %u bytes left in inbound buffer\n",
235          (unsigned int) size);
236     if (size < sizeof (struct GNUNET_MessageHeader))
237       break;
238     offset = (unsigned long) buf;
239     need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
240     if (GNUNET_NO == need_align)
241     {
242       /* can try to do zero-copy and process directly from original buffer */
243       hdr = (const struct GNUNET_MessageHeader *) buf;
244       want = ntohs (hdr->size);
245       if (want < sizeof (struct GNUNET_MessageHeader))
246       {
247         GNUNET_break_op (0);
248         mst->off = 0;
249         return GNUNET_SYSERR;
250       }
251       if (size < want)
252         break;                  /* or not: buffer incomplete, so copy to private buffer... */
253       if (one_shot == GNUNET_SYSERR)
254       {
255         /* cannot call callback again, but return value saying that
256          * we have another full message in the buffer */
257         ret = GNUNET_NO;
258         goto copy;
259       }
260       if (one_shot == GNUNET_YES)
261         one_shot = GNUNET_SYSERR;
262       if (GNUNET_SYSERR == mst->cb (mst->cb_cls, client_identity, hdr))
263         return GNUNET_SYSERR;
264       buf += want;
265       size -= want;
266     }
267     else
268     {
269       /* need to copy to private buffer to align;
270        * yes, we go a bit more spagetti than usual here */
271       goto do_align;
272     }
273   }
274 copy:
275   if ((size > 0) && (!purge))
276   {
277     if (size + mst->pos > mst->curr_buf)
278     {
279       mst->hdr = GNUNET_realloc (mst->hdr, size + mst->pos);
280       ibuf = (char *) mst->hdr;
281       mst->curr_buf = size + mst->pos;
282     }
283     GNUNET_assert (size + mst->pos <= mst->curr_buf);
284     memcpy (&ibuf[mst->pos], buf, size);
285     mst->pos += size;
286   }
287   if (purge)
288   {
289     mst->off = 0;
290     mst->pos = 0;
291   }
292   LOG (GNUNET_ERROR_TYPE_DEBUG,
293        "Server-mst leaves %u bytes in private buffer\n",
294        (unsigned int) (mst->pos - mst->off));
295   return ret;
296 }
297
298
299 /**
300  * Destroys a tokenizer.
301  *
302  * @param mst tokenizer to destroy
303  */
304 void
305 GNUNET_SERVER_mst_destroy (struct GNUNET_SERVER_MessageStreamTokenizer *mst)
306 {
307   GNUNET_free (mst->hdr);
308   GNUNET_free (mst);
309 }
310
311
312
313 /* end of server_mst.c */