4ccf4988f5418338f87b13d115648beca656f31c
[oweals/gnunet.git] / src / util / mst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file util/mst.c
23  * @brief convenience functions for handling inbound message buffers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36
37 #define LOG(kind, ...) GNUNET_log_from(kind, "util-mst", __VA_ARGS__)
38
39
40 /**
41  * Handle to a message stream tokenizer.
42  */
43 struct GNUNET_MessageStreamTokenizer {
44   /**
45    * Function to call on completed messages.
46    */
47   GNUNET_MessageTokenizerCallback cb;
48
49   /**
50    * Closure for @e cb.
51    */
52   void *cb_cls;
53
54   /**
55    * Size of the buffer (starting at @e hdr).
56    */
57   size_t curr_buf;
58
59   /**
60    * How many bytes in buffer have we already processed?
61    */
62   size_t off;
63
64   /**
65    * How many bytes in buffer are valid right now?
66    */
67   size_t pos;
68
69   /**
70    * Beginning of the buffer.  Typed like this to force alignment.
71    */
72   struct GNUNET_MessageHeader *hdr;
73 };
74
75
76 /**
77  * Create a message stream tokenizer.
78  *
79  * @param cb function to call on completed messages
80  * @param cb_cls closure for @a cb
81  * @return handle to tokenizer
82  */
83 struct GNUNET_MessageStreamTokenizer *
84 GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb,
85                   void *cb_cls)
86 {
87   struct GNUNET_MessageStreamTokenizer *ret;
88
89   ret = GNUNET_new(struct GNUNET_MessageStreamTokenizer);
90   ret->hdr = GNUNET_malloc(GNUNET_MIN_MESSAGE_SIZE);
91   ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
92   ret->cb = cb;
93   ret->cb_cls = cb_cls;
94   return ret;
95 }
96
97
98 /**
99  * Add incoming data to the receive buffer and call the
100  * callback for all complete messages.
101  *
102  * @param mst tokenizer to use
103  * @param buf input data to add
104  * @param size number of bytes in @a buf
105  * @param purge should any excess bytes in the buffer be discarded
106  *       (i.e. for packet-based services like UDP)
107  * @param one_shot only call callback once, keep rest of message in buffer
108  * @return #GNUNET_OK if we are done processing (need more data)
109  *         #GNUNET_NO if @a one_shot was set and we have another message ready
110  *         #GNUNET_SYSERR if the data stream is corrupt
111  */
112 int
113 GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst,
114                        const char *buf,
115                        size_t size,
116                        int purge,
117                        int one_shot)
118 {
119   const struct GNUNET_MessageHeader *hdr;
120   size_t delta;
121   uint16_t want;
122   char *ibuf;
123   int need_align;
124   unsigned long offset;
125   int ret;
126   int cbret;
127
128   GNUNET_assert(mst->off <= mst->pos);
129   GNUNET_assert(mst->pos <= mst->curr_buf);
130   LOG(GNUNET_ERROR_TYPE_DEBUG,
131       "MST receives %u bytes with %u bytes already in private buffer\n",
132       (unsigned int)size,
133       (unsigned int)(mst->pos - mst->off));
134   ret = GNUNET_OK;
135   ibuf = (char *)mst->hdr;
136   while (mst->pos > 0)
137     {
138 do_align:
139       GNUNET_assert(mst->pos >= mst->off);
140       if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
141           (0 != (mst->off % ALIGN_FACTOR)))
142         {
143           /* need to align or need more space */
144           mst->pos -= mst->off;
145           memmove(ibuf,
146                   &ibuf[mst->off],
147                   mst->pos);
148           mst->off = 0;
149         }
150       if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
151         {
152           delta
153             = GNUNET_MIN(sizeof(struct GNUNET_MessageHeader)
154                          - (mst->pos - mst->off),
155                          size);
156           GNUNET_memcpy(&ibuf[mst->pos],
157                         buf,
158                         delta);
159           mst->pos += delta;
160           buf += delta;
161           size -= delta;
162         }
163       if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
164         {
165           if (purge)
166             {
167               mst->off = 0;
168               mst->pos = 0;
169             }
170           return GNUNET_OK;
171         }
172       hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
173       want = ntohs(hdr->size);
174       if (want < sizeof(struct GNUNET_MessageHeader))
175         {
176           GNUNET_break_op(0);
177           return GNUNET_SYSERR;
178         }
179       if ((mst->curr_buf - mst->off < want) &&
180           (mst->off > 0))
181         {
182           /* can get more space by moving */
183           mst->pos -= mst->off;
184           memmove(ibuf,
185                   &ibuf[mst->off],
186                   mst->pos);
187           mst->off = 0;
188         }
189       if (mst->curr_buf < want)
190         {
191           /* need to get more space by growing buffer */
192           GNUNET_assert(0 == mst->off);
193           mst->hdr = GNUNET_realloc(mst->hdr,
194                                     want);
195           ibuf = (char *)mst->hdr;
196           mst->curr_buf = want;
197         }
198       hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
199       if (mst->pos - mst->off < want)
200         {
201           delta = GNUNET_MIN(want - (mst->pos - mst->off),
202                              size);
203           GNUNET_assert(mst->pos + delta <= mst->curr_buf);
204           GNUNET_memcpy(&ibuf[mst->pos],
205                         buf,
206                         delta);
207           mst->pos += delta;
208           buf += delta;
209           size -= delta;
210         }
211       if (mst->pos - mst->off < want)
212         {
213           if (purge)
214             {
215               mst->off = 0;
216               mst->pos = 0;
217             }
218           return GNUNET_OK;
219         }
220       if (one_shot == GNUNET_SYSERR)
221         {
222           /* cannot call callback again, but return value saying that
223            * we have another full message in the buffer */
224           ret = GNUNET_NO;
225           goto copy;
226         }
227       if (one_shot == GNUNET_YES)
228         one_shot = GNUNET_SYSERR;
229       mst->off += want;
230       if (GNUNET_OK !=
231           (cbret = mst->cb(mst->cb_cls,
232                            hdr)))
233         {
234           if (GNUNET_SYSERR == cbret)
235             GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
236                        "Failure processing message of type %u and size %u\n",
237                        ntohs(hdr->type),
238                        ntohs(hdr->size));
239           return GNUNET_SYSERR;
240         }
241       if (mst->off == mst->pos)
242         {
243           /* reset to beginning of buffer, it's free right now! */
244           mst->off = 0;
245           mst->pos = 0;
246         }
247     }
248   GNUNET_assert(0 == mst->pos);
249   while (size > 0)
250     {
251       LOG(GNUNET_ERROR_TYPE_DEBUG,
252           "Server-mst has %u bytes left in inbound buffer\n",
253           (unsigned int)size);
254       if (size < sizeof(struct GNUNET_MessageHeader))
255         break;
256       offset = (unsigned long)buf;
257       need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
258       if (GNUNET_NO == need_align)
259         {
260           /* can try to do zero-copy and process directly from original buffer */
261           hdr = (const struct GNUNET_MessageHeader *)buf;
262           want = ntohs(hdr->size);
263           if (want < sizeof(struct GNUNET_MessageHeader))
264             {
265               GNUNET_break_op(0);
266               mst->off = 0;
267               return GNUNET_SYSERR;
268             }
269           if (size < want)
270             break;              /* or not: buffer incomplete, so copy to private buffer... */
271           if (one_shot == GNUNET_SYSERR)
272             {
273               /* cannot call callback again, but return value saying that
274                * we have another full message in the buffer */
275               ret = GNUNET_NO;
276               goto copy;
277             }
278           if (one_shot == GNUNET_YES)
279             one_shot = GNUNET_SYSERR;
280           if (GNUNET_OK !=
281               (cbret = mst->cb(mst->cb_cls,
282                                hdr)))
283             {
284               if (GNUNET_SYSERR == cbret)
285                 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
286                            "Failure processing message of type %u and size %u\n",
287                            ntohs(hdr->type),
288                            ntohs(hdr->size));
289               return GNUNET_SYSERR;
290             }
291           buf += want;
292           size -= want;
293         }
294       else
295         {
296           /* need to copy to private buffer to align;
297            * yes, we go a bit more spagetti than usual here */
298           goto do_align;
299         }
300     }
301 copy:
302   if ((size > 0) && (!purge))
303     {
304       if (size + mst->pos > mst->curr_buf)
305         {
306           mst->hdr = GNUNET_realloc(mst->hdr,
307                                     size + mst->pos);
308           ibuf = (char *)mst->hdr;
309           mst->curr_buf = size + mst->pos;
310         }
311       GNUNET_assert(size + mst->pos <= mst->curr_buf);
312       GNUNET_memcpy(&ibuf[mst->pos],
313                     buf,
314                     size);
315       mst->pos += size;
316     }
317   if (purge)
318     {
319       mst->off = 0;
320       mst->pos = 0;
321     }
322   LOG(GNUNET_ERROR_TYPE_DEBUG,
323       "Server-mst leaves %u bytes in private buffer\n",
324       (unsigned int)(mst->pos - mst->off));
325   return ret;
326 }
327
328
329 /**
330  * Add incoming data to the receive buffer and call the
331  * callback for all complete messages.
332  *
333  * @param mst tokenizer to use
334  * @param buf input data to add
335  * @param size number of bytes in @a buf
336  * @param purge should any excess bytes in the buffer be discarded
337  *       (i.e. for packet-based services like UDP)
338  * @param one_shot only call callback once, keep rest of message in buffer
339  * @return #GNUNET_OK if we are done processing (need more data)
340  *         #GNUNET_NO if one_shot was set and we have another message ready
341  *         #GNUNET_SYSERR if the data stream is corrupt
342  */
343 int
344 GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst,
345                 struct GNUNET_NETWORK_Handle *sock,
346                 int purge,
347                 int one_shot)
348 {
349   ssize_t ret;
350   size_t left;
351   char *buf;
352
353   left = mst->curr_buf - mst->pos;
354   buf = (char *)mst->hdr;
355   ret = GNUNET_NETWORK_socket_recv(sock,
356                                    &buf[mst->pos],
357                                    left);
358   if (-1 == ret)
359     {
360       if ((EAGAIN == errno) ||
361           (EINTR == errno))
362         return GNUNET_OK;
363       GNUNET_log_strerror(GNUNET_ERROR_TYPE_INFO,
364                           "recv");
365       return GNUNET_SYSERR;
366     }
367   if (0 == ret)
368     {
369       /* other side closed connection, treat as error */
370       return GNUNET_SYSERR;
371     }
372   mst->pos += ret;
373   return GNUNET_MST_from_buffer(mst,
374                                 NULL,
375                                 0,
376                                 purge,
377                                 one_shot);
378 }
379
380
381 /**
382  * Obtain the next message from the @a mst, assuming that
383  * there are more unprocessed messages in the internal buffer
384  * of the @a mst.
385  *
386  * @param mst tokenizer to use
387  * @param one_shot only call callback once, keep rest of message in buffer
388  * @return #GNUNET_OK if we are done processing (need more data)
389  *         #GNUNET_NO if one_shot was set and we have another message ready
390  *         #GNUNET_SYSERR if the data stream is corrupt
391  */
392 int
393 GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst,
394                 int one_shot)
395 {
396   return GNUNET_MST_from_buffer(mst,
397                                 NULL,
398                                 0,
399                                 GNUNET_NO,
400                                 one_shot);
401 }
402
403
404 /**
405  * Destroys a tokenizer.
406  *
407  * @param mst tokenizer to destroy
408  */
409 void
410 GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
411 {
412   GNUNET_free(mst->hdr);
413   GNUNET_free(mst);
414 }
415
416
417
418 /* end of server_mst.c */