tighten formatting rules
[oweals/gnunet.git] / src / util / mst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file util/mst.c
23  * @brief convenience functions for handling inbound message buffers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36
37 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38
39
40 /**
41  * Handle to a message stream tokenizer.
42  */
43 struct GNUNET_MessageStreamTokenizer
44 {
45   /**
46    * Function to call on completed messages.
47    */
48   GNUNET_MessageTokenizerCallback cb;
49
50   /**
51    * Closure for @e cb.
52    */
53   void *cb_cls;
54
55   /**
56    * Size of the buffer (starting at @e hdr).
57    */
58   size_t curr_buf;
59
60   /**
61    * How many bytes in buffer have we already processed?
62    */
63   size_t off;
64
65   /**
66    * How many bytes in buffer are valid right now?
67    */
68   size_t pos;
69
70   /**
71    * Beginning of the buffer.  Typed like this to force alignment.
72    */
73   struct GNUNET_MessageHeader *hdr;
74 };
75
76
77 /**
78  * Create a message stream tokenizer.
79  *
80  * @param cb function to call on completed messages
81  * @param cb_cls closure for @a cb
82  * @return handle to tokenizer
83  */
84 struct GNUNET_MessageStreamTokenizer *
85 GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
86                    void *cb_cls)
87 {
88   struct GNUNET_MessageStreamTokenizer *ret;
89
90   ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
91   ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
92   ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
93   ret->cb = cb;
94   ret->cb_cls = cb_cls;
95   return ret;
96 }
97
98
99 /**
100  * Add incoming data to the receive buffer and call the
101  * callback for all complete messages.
102  *
103  * @param mst tokenizer to use
104  * @param buf input data to add
105  * @param size number of bytes in @a buf
106  * @param purge should any excess bytes in the buffer be discarded
107  *       (i.e. for packet-based services like UDP)
108  * @param one_shot only call callback once, keep rest of message in buffer
109  * @return #GNUNET_OK if we are done processing (need more data)
110  *         #GNUNET_NO if @a one_shot was set and we have another message ready
111  *         #GNUNET_SYSERR if the data stream is corrupt
112  */
113 int
114 GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
115                         const char *buf,
116                         size_t size,
117                         int purge,
118                         int one_shot)
119 {
120   const struct GNUNET_MessageHeader *hdr;
121   size_t delta;
122   uint16_t want;
123   char *ibuf;
124   int need_align;
125   unsigned long offset;
126   int ret;
127   int cbret;
128
129   GNUNET_assert (mst->off <= mst->pos);
130   GNUNET_assert (mst->pos <= mst->curr_buf);
131   LOG (GNUNET_ERROR_TYPE_DEBUG,
132        "MST receives %u bytes with %u bytes already in private buffer\n",
133        (unsigned int) size,
134        (unsigned int) (mst->pos - mst->off));
135   ret = GNUNET_OK;
136   ibuf = (char *) mst->hdr;
137   while (mst->pos > 0)
138   {
139 do_align:
140     GNUNET_assert (mst->pos >= mst->off);
141     if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
142         (0 != (mst->off % ALIGN_FACTOR)))
143     {
144       /* need to align or need more space */
145       mst->pos -= mst->off;
146       memmove (ibuf,
147                &ibuf[mst->off],
148                mst->pos);
149       mst->off = 0;
150     }
151     if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
152     {
153       delta
154         = GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
155                       - (mst->pos - mst->off),
156                       size);
157       GNUNET_memcpy (&ibuf[mst->pos],
158                      buf,
159                      delta);
160       mst->pos += delta;
161       buf += delta;
162       size -= delta;
163     }
164     if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
165     {
166       if (purge)
167       {
168         mst->off = 0;
169         mst->pos = 0;
170       }
171       return GNUNET_OK;
172     }
173     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
174     want = ntohs (hdr->size);
175     if (want < sizeof(struct GNUNET_MessageHeader))
176     {
177       GNUNET_break_op (0);
178       return GNUNET_SYSERR;
179     }
180     if ((mst->curr_buf - mst->off < want) &&
181         (mst->off > 0))
182     {
183       /* can get more space by moving */
184       mst->pos -= mst->off;
185       memmove (ibuf,
186                &ibuf[mst->off],
187                mst->pos);
188       mst->off = 0;
189     }
190     if (mst->curr_buf < want)
191     {
192       /* need to get more space by growing buffer */
193       GNUNET_assert (0 == mst->off);
194       mst->hdr = GNUNET_realloc (mst->hdr,
195                                  want);
196       ibuf = (char *) mst->hdr;
197       mst->curr_buf = want;
198     }
199     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
200     if (mst->pos - mst->off < want)
201     {
202       delta = GNUNET_MIN (want - (mst->pos - mst->off),
203                           size);
204       GNUNET_assert (mst->pos + delta <= mst->curr_buf);
205       GNUNET_memcpy (&ibuf[mst->pos],
206                      buf,
207                      delta);
208       mst->pos += delta;
209       buf += delta;
210       size -= delta;
211     }
212     if (mst->pos - mst->off < want)
213     {
214       if (purge)
215       {
216         mst->off = 0;
217         mst->pos = 0;
218       }
219       return GNUNET_OK;
220     }
221     if (one_shot == GNUNET_SYSERR)
222     {
223       /* cannot call callback again, but return value saying that
224        * we have another full message in the buffer */
225       ret = GNUNET_NO;
226       goto copy;
227     }
228     if (one_shot == GNUNET_YES)
229       one_shot = GNUNET_SYSERR;
230     mst->off += want;
231     if (GNUNET_OK !=
232         (cbret = mst->cb (mst->cb_cls,
233                           hdr)))
234     {
235       if (GNUNET_SYSERR == cbret)
236         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
237                     "Failure processing message of type %u and size %u\n",
238                     ntohs (hdr->type),
239                     ntohs (hdr->size));
240       return GNUNET_SYSERR;
241     }
242     if (mst->off == mst->pos)
243     {
244       /* reset to beginning of buffer, it's free right now! */
245       mst->off = 0;
246       mst->pos = 0;
247     }
248   }
249   GNUNET_assert (0 == mst->pos);
250   while (size > 0)
251   {
252     LOG (GNUNET_ERROR_TYPE_DEBUG,
253          "Server-mst has %u bytes left in inbound buffer\n",
254          (unsigned int) size);
255     if (size < sizeof(struct GNUNET_MessageHeader))
256       break;
257     offset = (unsigned long) buf;
258     need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
259     if (GNUNET_NO == need_align)
260     {
261       /* can try to do zero-copy and process directly from original buffer */
262       hdr = (const struct GNUNET_MessageHeader *) buf;
263       want = ntohs (hdr->size);
264       if (want < sizeof(struct GNUNET_MessageHeader))
265       {
266         GNUNET_break_op (0);
267         mst->off = 0;
268         return GNUNET_SYSERR;
269       }
270       if (size < want)
271         break;                  /* or not: buffer incomplete, so copy to private buffer... */
272       if (one_shot == GNUNET_SYSERR)
273       {
274         /* cannot call callback again, but return value saying that
275          * we have another full message in the buffer */
276         ret = GNUNET_NO;
277         goto copy;
278       }
279       if (one_shot == GNUNET_YES)
280         one_shot = GNUNET_SYSERR;
281       if (GNUNET_OK !=
282           (cbret = mst->cb (mst->cb_cls,
283                             hdr)))
284       {
285         if (GNUNET_SYSERR == cbret)
286           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
287                       "Failure processing message of type %u and size %u\n",
288                       ntohs (hdr->type),
289                       ntohs (hdr->size));
290         return GNUNET_SYSERR;
291       }
292       buf += want;
293       size -= want;
294     }
295     else
296     {
297       /* need to copy to private buffer to align;
298        * yes, we go a bit more spagetti than usual here */
299       goto do_align;
300     }
301   }
302 copy:
303   if ((size > 0) && (! purge))
304   {
305     if (size + mst->pos > mst->curr_buf)
306     {
307       mst->hdr = GNUNET_realloc (mst->hdr,
308                                  size + mst->pos);
309       ibuf = (char *) mst->hdr;
310       mst->curr_buf = size + mst->pos;
311     }
312     GNUNET_assert (size + mst->pos <= mst->curr_buf);
313     GNUNET_memcpy (&ibuf[mst->pos],
314                    buf,
315                    size);
316     mst->pos += size;
317   }
318   if (purge)
319   {
320     mst->off = 0;
321     mst->pos = 0;
322   }
323   LOG (GNUNET_ERROR_TYPE_DEBUG,
324        "Server-mst leaves %u bytes in private buffer\n",
325        (unsigned int) (mst->pos - mst->off));
326   return ret;
327 }
328
329
330 /**
331  * Add incoming data to the receive buffer and call the
332  * callback for all complete messages.
333  *
334  * @param mst tokenizer to use
335  * @param buf input data to add
336  * @param size number of bytes in @a buf
337  * @param purge should any excess bytes in the buffer be discarded
338  *       (i.e. for packet-based services like UDP)
339  * @param one_shot only call callback once, keep rest of message in buffer
340  * @return #GNUNET_OK if we are done processing (need more data)
341  *         #GNUNET_NO if one_shot was set and we have another message ready
342  *         #GNUNET_SYSERR if the data stream is corrupt
343  */
344 int
345 GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
346                  struct GNUNET_NETWORK_Handle *sock,
347                  int purge,
348                  int one_shot)
349 {
350   ssize_t ret;
351   size_t left;
352   char *buf;
353
354   left = mst->curr_buf - mst->pos;
355   buf = (char *) mst->hdr;
356   ret = GNUNET_NETWORK_socket_recv (sock,
357                                     &buf[mst->pos],
358                                     left);
359   if (-1 == ret)
360   {
361     if ((EAGAIN == errno) ||
362         (EINTR == errno))
363       return GNUNET_OK;
364     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
365                          "recv");
366     return GNUNET_SYSERR;
367   }
368   if (0 == ret)
369   {
370     /* other side closed connection, treat as error */
371     return GNUNET_SYSERR;
372   }
373   mst->pos += ret;
374   return GNUNET_MST_from_buffer (mst,
375                                  NULL,
376                                  0,
377                                  purge,
378                                  one_shot);
379 }
380
381
382 /**
383  * Obtain the next message from the @a mst, assuming that
384  * there are more unprocessed messages in the internal buffer
385  * of the @a mst.
386  *
387  * @param mst tokenizer to use
388  * @param one_shot only call callback once, keep rest of message in buffer
389  * @return #GNUNET_OK if we are done processing (need more data)
390  *         #GNUNET_NO if one_shot was set and we have another message ready
391  *         #GNUNET_SYSERR if the data stream is corrupt
392  */
393 int
394 GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
395                  int one_shot)
396 {
397   return GNUNET_MST_from_buffer (mst,
398                                  NULL,
399                                  0,
400                                  GNUNET_NO,
401                                  one_shot);
402 }
403
404
405 /**
406  * Destroys a tokenizer.
407  *
408  * @param mst tokenizer to destroy
409  */
410 void
411 GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst)
412 {
413   GNUNET_free (mst->hdr);
414   GNUNET_free (mst);
415 }
416
417
418 /* end of server_mst.c */