Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / util / mst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/mst.c
23  * @brief convenience functions for handling inbound message buffers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38
39
40 /**
41  * Handle to a message stream tokenizer.
42  */
43 struct GNUNET_MessageStreamTokenizer
44 {
45
46   /**
47    * Function to call on completed messages.
48    */
49   GNUNET_MessageTokenizerCallback cb;
50
51   /**
52    * Closure for @e cb.
53    */
54   void *cb_cls;
55
56   /**
57    * Size of the buffer (starting at @e hdr).
58    */
59   size_t curr_buf;
60
61   /**
62    * How many bytes in buffer have we already processed?
63    */
64   size_t off;
65
66   /**
67    * How many bytes in buffer are valid right now?
68    */
69   size_t pos;
70
71   /**
72    * Beginning of the buffer.  Typed like this to force alignment.
73    */
74   struct GNUNET_MessageHeader *hdr;
75
76 };
77
78
79 /**
80  * Create a message stream tokenizer.
81  *
82  * @param cb function to call on completed messages
83  * @param cb_cls closure for @a cb
84  * @return handle to tokenizer
85  */
86 struct GNUNET_MessageStreamTokenizer *
87 GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
88                    void *cb_cls)
89 {
90   struct GNUNET_MessageStreamTokenizer *ret;
91
92   ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
93   ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
94   ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
95   ret->cb = cb;
96   ret->cb_cls = cb_cls;
97   return ret;
98 }
99
100
101 /**
102  * Add incoming data to the receive buffer and call the
103  * callback for all complete messages.
104  *
105  * @param mst tokenizer to use
106  * @param buf input data to add
107  * @param size number of bytes in @a buf
108  * @param purge should any excess bytes in the buffer be discarded
109  *       (i.e. for packet-based services like UDP)
110  * @param one_shot only call callback once, keep rest of message in buffer
111  * @return #GNUNET_OK if we are done processing (need more data)
112  *         #GNUNET_NO if @a one_shot was set and we have another message ready
113  *         #GNUNET_SYSERR if the data stream is corrupt
114  */
115 int
116 GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
117                         const char *buf,
118                         size_t size,
119                         int purge,
120                         int one_shot)
121 {
122   const struct GNUNET_MessageHeader *hdr;
123   size_t delta;
124   uint16_t want;
125   char *ibuf;
126   int need_align;
127   unsigned long offset;
128   int ret;
129
130   GNUNET_assert (mst->off <= mst->pos);
131   GNUNET_assert (mst->pos <= mst->curr_buf);
132   LOG (GNUNET_ERROR_TYPE_DEBUG,
133        "MST receives %u bytes with %u bytes already in private buffer\n",
134        (unsigned int) size,
135        (unsigned int) (mst->pos - mst->off));
136   ret = GNUNET_OK;
137   ibuf = (char *) mst->hdr;
138   while (mst->pos > 0)
139   {
140 do_align:
141     GNUNET_assert (mst->pos >= mst->off);
142     if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
143         (0 != (mst->off % ALIGN_FACTOR)))
144     {
145       /* need to align or need more space */
146       mst->pos -= mst->off;
147       memmove (ibuf,
148                &ibuf[mst->off],
149                mst->pos);
150       mst->off = 0;
151     }
152     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
153     {
154       delta
155         = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
156                       - (mst->pos - mst->off),
157                       size);
158       GNUNET_memcpy (&ibuf[mst->pos],
159                      buf,
160                      delta);
161       mst->pos += delta;
162       buf += delta;
163       size -= delta;
164     }
165     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
166     {
167       if (purge)
168       {
169         mst->off = 0;
170         mst->pos = 0;
171       }
172       return GNUNET_OK;
173     }
174     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
175     want = ntohs (hdr->size);
176     if (want < sizeof (struct GNUNET_MessageHeader))
177     {
178       GNUNET_break_op (0);
179       return GNUNET_SYSERR;
180     }
181     if ( (mst->curr_buf - mst->off < want) &&
182          (mst->off > 0) )
183     {
184       /* can get more space by moving */
185       mst->pos -= mst->off;
186       memmove (ibuf,
187                &ibuf[mst->off],
188                mst->pos);
189       mst->off = 0;
190     }
191     if (mst->curr_buf < want)
192     {
193       /* need to get more space by growing buffer */
194       GNUNET_assert (0 == mst->off);
195       mst->hdr = GNUNET_realloc (mst->hdr,
196                                  want);
197       ibuf = (char *) mst->hdr;
198       mst->curr_buf = want;
199     }
200     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
201     if (mst->pos - mst->off < want)
202     {
203       delta = GNUNET_MIN (want - (mst->pos - mst->off),
204                           size);
205       GNUNET_assert (mst->pos + delta <= mst->curr_buf);
206       GNUNET_memcpy (&ibuf[mst->pos],
207                      buf,
208                      delta);
209       mst->pos += delta;
210       buf += delta;
211       size -= delta;
212     }
213     if (mst->pos - mst->off < want)
214     {
215       if (purge)
216       {
217         mst->off = 0;
218         mst->pos = 0;
219       }
220       return GNUNET_OK;
221     }
222     if (one_shot == GNUNET_SYSERR)
223     {
224       /* cannot call callback again, but return value saying that
225        * we have another full message in the buffer */
226       ret = GNUNET_NO;
227       goto copy;
228     }
229     if (one_shot == GNUNET_YES)
230       one_shot = GNUNET_SYSERR;
231     mst->off += want;
232   if (GNUNET_SYSERR == mst->cb (mst->cb_cls,
233                                 hdr))
234       return GNUNET_SYSERR;
235     if (mst->off == mst->pos)
236     {
237       /* reset to beginning of buffer, it's free right now! */
238       mst->off = 0;
239       mst->pos = 0;
240     }
241   }
242   GNUNET_assert (0 == mst->pos);
243   while (size > 0)
244   {
245     LOG (GNUNET_ERROR_TYPE_DEBUG,
246          "Server-mst has %u bytes left in inbound buffer\n",
247          (unsigned int) size);
248     if (size < sizeof (struct GNUNET_MessageHeader))
249       break;
250     offset = (unsigned long) buf;
251     need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
252     if (GNUNET_NO == need_align)
253     {
254       /* can try to do zero-copy and process directly from original buffer */
255       hdr = (const struct GNUNET_MessageHeader *) buf;
256       want = ntohs (hdr->size);
257       if (want < sizeof (struct GNUNET_MessageHeader))
258       {
259         GNUNET_break_op (0);
260         mst->off = 0;
261         return GNUNET_SYSERR;
262       }
263       if (size < want)
264         break;                  /* or not: buffer incomplete, so copy to private buffer... */
265       if (one_shot == GNUNET_SYSERR)
266       {
267         /* cannot call callback again, but return value saying that
268          * we have another full message in the buffer */
269         ret = GNUNET_NO;
270         goto copy;
271       }
272       if (one_shot == GNUNET_YES)
273         one_shot = GNUNET_SYSERR;
274       if (GNUNET_SYSERR == mst->cb (mst->cb_cls,
275                                     hdr))
276         return GNUNET_SYSERR;
277       buf += want;
278       size -= want;
279     }
280     else
281     {
282       /* need to copy to private buffer to align;
283        * yes, we go a bit more spagetti than usual here */
284       goto do_align;
285     }
286   }
287 copy:
288   if ((size > 0) && (!purge))
289   {
290     if (size + mst->pos > mst->curr_buf)
291     {
292       mst->hdr = GNUNET_realloc (mst->hdr,
293                                  size + mst->pos);
294       ibuf = (char *) mst->hdr;
295       mst->curr_buf = size + mst->pos;
296     }
297     GNUNET_assert (size + mst->pos <= mst->curr_buf);
298     GNUNET_memcpy (&ibuf[mst->pos],
299                    buf,
300                    size);
301     mst->pos += size;
302   }
303   if (purge)
304   {
305     mst->off = 0;
306     mst->pos = 0;
307   }
308   LOG (GNUNET_ERROR_TYPE_DEBUG,
309        "Server-mst leaves %u bytes in private buffer\n",
310        (unsigned int) (mst->pos - mst->off));
311   return ret;
312 }
313
314
315 /**
316  * Add incoming data to the receive buffer and call the
317  * callback for all complete messages.
318  *
319  * @param mst tokenizer to use
320  * @param buf input data to add
321  * @param size number of bytes in @a buf
322  * @param purge should any excess bytes in the buffer be discarded
323  *       (i.e. for packet-based services like UDP)
324  * @param one_shot only call callback once, keep rest of message in buffer
325  * @return #GNUNET_OK if we are done processing (need more data)
326  *         #GNUNET_NO if one_shot was set and we have another message ready
327  *         #GNUNET_SYSERR if the data stream is corrupt
328  */
329 int
330 GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
331                  struct GNUNET_NETWORK_Handle *sock,
332                  int purge,
333                  int one_shot)
334 {
335   ssize_t ret;
336   size_t left;
337   char *buf;
338
339   left = mst->curr_buf - mst->pos;
340   buf = (char *) mst->hdr;
341   ret = GNUNET_NETWORK_socket_recv (sock,
342                                     &buf[mst->pos],
343                                     left);
344   if (-1 == ret)
345   {
346     if ( (EAGAIN == errno) ||
347          (EINTR == errno) )
348       return GNUNET_OK;
349     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
350                          "recv");
351     return GNUNET_SYSERR;
352   }
353   if (0 == ret)
354   {
355     /* other side closed connection, treat as error */
356     return GNUNET_SYSERR;
357   }
358   mst->pos += ret;
359   return GNUNET_MST_from_buffer (mst,
360                                  NULL,
361                                  0,
362                                  purge,
363                                  one_shot);
364 }
365
366
367 /**
368  * Obtain the next message from the @a mst, assuming that
369  * there are more unprocessed messages in the internal buffer
370  * of the @a mst.
371  *
372  * @param mst tokenizer to use
373  * @param one_shot only call callback once, keep rest of message in buffer
374  * @return #GNUNET_OK if we are done processing (need more data)
375  *         #GNUNET_NO if one_shot was set and we have another message ready
376  *         #GNUNET_SYSERR if the data stream is corrupt
377  */
378 int
379 GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
380                  int one_shot)
381 {
382   return GNUNET_MST_from_buffer (mst,
383                                  NULL,
384                                  0,
385                                  GNUNET_NO,
386                                  one_shot);
387 }
388
389
390 /**
391  * Destroys a tokenizer.
392  *
393  * @param mst tokenizer to destroy
394  */
395 void
396 GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst)
397 {
398   GNUNET_free (mst->hdr);
399   GNUNET_free (mst);
400 }
401
402
403
404 /* end of server_mst.c */