Merge branch 'cadet-new-options'
[oweals/gnunet.git] / src / util / mst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file util/mst.c
23  * @brief convenience functions for handling inbound message buffers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38
39
40 /**
41  * Handle to a message stream tokenizer.
42  */
43 struct GNUNET_MessageStreamTokenizer
44 {
45
46   /**
47    * Function to call on completed messages.
48    */
49   GNUNET_MessageTokenizerCallback cb;
50
51   /**
52    * Closure for @e cb.
53    */
54   void *cb_cls;
55
56   /**
57    * Size of the buffer (starting at @e hdr).
58    */
59   size_t curr_buf;
60
61   /**
62    * How many bytes in buffer have we already processed?
63    */
64   size_t off;
65
66   /**
67    * How many bytes in buffer are valid right now?
68    */
69   size_t pos;
70
71   /**
72    * Beginning of the buffer.  Typed like this to force alignment.
73    */
74   struct GNUNET_MessageHeader *hdr;
75
76 };
77
78
79 /**
80  * Create a message stream tokenizer.
81  *
82  * @param cb function to call on completed messages
83  * @param cb_cls closure for @a cb
84  * @return handle to tokenizer
85  */
86 struct GNUNET_MessageStreamTokenizer *
87 GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
88                    void *cb_cls)
89 {
90   struct GNUNET_MessageStreamTokenizer *ret;
91
92   ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
93   ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
94   ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
95   ret->cb = cb;
96   ret->cb_cls = cb_cls;
97   return ret;
98 }
99
100
101 /**
102  * Add incoming data to the receive buffer and call the
103  * callback for all complete messages.
104  *
105  * @param mst tokenizer to use
106  * @param buf input data to add
107  * @param size number of bytes in @a buf
108  * @param purge should any excess bytes in the buffer be discarded
109  *       (i.e. for packet-based services like UDP)
110  * @param one_shot only call callback once, keep rest of message in buffer
111  * @return #GNUNET_OK if we are done processing (need more data)
112  *         #GNUNET_NO if @a one_shot was set and we have another message ready
113  *         #GNUNET_SYSERR if the data stream is corrupt
114  */
115 int
116 GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
117                         const char *buf,
118                         size_t size,
119                         int purge,
120                         int one_shot)
121 {
122   const struct GNUNET_MessageHeader *hdr;
123   size_t delta;
124   uint16_t want;
125   char *ibuf;
126   int need_align;
127   unsigned long offset;
128   int ret;
129   int cbret;
130
131   GNUNET_assert (mst->off <= mst->pos);
132   GNUNET_assert (mst->pos <= mst->curr_buf);
133   LOG (GNUNET_ERROR_TYPE_DEBUG,
134        "MST receives %u bytes with %u bytes already in private buffer\n",
135        (unsigned int) size,
136        (unsigned int) (mst->pos - mst->off));
137   ret = GNUNET_OK;
138   ibuf = (char *) mst->hdr;
139   while (mst->pos > 0)
140   {
141 do_align:
142     GNUNET_assert (mst->pos >= mst->off);
143     if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
144         (0 != (mst->off % ALIGN_FACTOR)))
145     {
146       /* need to align or need more space */
147       mst->pos -= mst->off;
148       memmove (ibuf,
149                &ibuf[mst->off],
150                mst->pos);
151       mst->off = 0;
152     }
153     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
154     {
155       delta
156         = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
157                       - (mst->pos - mst->off),
158                       size);
159       GNUNET_memcpy (&ibuf[mst->pos],
160                      buf,
161                      delta);
162       mst->pos += delta;
163       buf += delta;
164       size -= delta;
165     }
166     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
167     {
168       if (purge)
169       {
170         mst->off = 0;
171         mst->pos = 0;
172       }
173       return GNUNET_OK;
174     }
175     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
176     want = ntohs (hdr->size);
177     if (want < sizeof (struct GNUNET_MessageHeader))
178     {
179       GNUNET_break_op (0);
180       return GNUNET_SYSERR;
181     }
182     if ( (mst->curr_buf - mst->off < want) &&
183          (mst->off > 0) )
184     {
185       /* can get more space by moving */
186       mst->pos -= mst->off;
187       memmove (ibuf,
188                &ibuf[mst->off],
189                mst->pos);
190       mst->off = 0;
191     }
192     if (mst->curr_buf < want)
193     {
194       /* need to get more space by growing buffer */
195       GNUNET_assert (0 == mst->off);
196       mst->hdr = GNUNET_realloc (mst->hdr,
197                                  want);
198       ibuf = (char *) mst->hdr;
199       mst->curr_buf = want;
200     }
201     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
202     if (mst->pos - mst->off < want)
203     {
204       delta = GNUNET_MIN (want - (mst->pos - mst->off),
205                           size);
206       GNUNET_assert (mst->pos + delta <= mst->curr_buf);
207       GNUNET_memcpy (&ibuf[mst->pos],
208                      buf,
209                      delta);
210       mst->pos += delta;
211       buf += delta;
212       size -= delta;
213     }
214     if (mst->pos - mst->off < want)
215     {
216       if (purge)
217       {
218         mst->off = 0;
219         mst->pos = 0;
220       }
221       return GNUNET_OK;
222     }
223     if (one_shot == GNUNET_SYSERR)
224     {
225       /* cannot call callback again, but return value saying that
226        * we have another full message in the buffer */
227       ret = GNUNET_NO;
228       goto copy;
229     }
230     if (one_shot == GNUNET_YES)
231       one_shot = GNUNET_SYSERR;
232     mst->off += want;
233     if (GNUNET_OK !=
234         (cbret = mst->cb (mst->cb_cls,
235                            hdr)))
236     {
237       if (GNUNET_SYSERR == cbret)
238         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
239                     "Failure processing message of type %u and size %u\n",
240                     ntohs (hdr->type),
241                     ntohs (hdr->size));
242       return GNUNET_SYSERR;
243     }
244     if (mst->off == mst->pos)
245     {
246       /* reset to beginning of buffer, it's free right now! */
247       mst->off = 0;
248       mst->pos = 0;
249     }
250   }
251   GNUNET_assert (0 == mst->pos);
252   while (size > 0)
253   {
254     LOG (GNUNET_ERROR_TYPE_DEBUG,
255          "Server-mst has %u bytes left in inbound buffer\n",
256          (unsigned int) size);
257     if (size < sizeof (struct GNUNET_MessageHeader))
258       break;
259     offset = (unsigned long) buf;
260     need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
261     if (GNUNET_NO == need_align)
262     {
263       /* can try to do zero-copy and process directly from original buffer */
264       hdr = (const struct GNUNET_MessageHeader *) buf;
265       want = ntohs (hdr->size);
266       if (want < sizeof (struct GNUNET_MessageHeader))
267       {
268         GNUNET_break_op (0);
269         mst->off = 0;
270         return GNUNET_SYSERR;
271       }
272       if (size < want)
273         break;                  /* or not: buffer incomplete, so copy to private buffer... */
274       if (one_shot == GNUNET_SYSERR)
275       {
276         /* cannot call callback again, but return value saying that
277          * we have another full message in the buffer */
278         ret = GNUNET_NO;
279         goto copy;
280       }
281       if (one_shot == GNUNET_YES)
282         one_shot = GNUNET_SYSERR;
283       if (GNUNET_OK !=
284           (cbret = mst->cb (mst->cb_cls,
285                             hdr)))
286       {
287         if (GNUNET_SYSERR == cbret)
288           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
289                       "Failure processing message of type %u and size %u\n",
290                       ntohs (hdr->type),
291                       ntohs (hdr->size));
292         return GNUNET_SYSERR;
293       }
294       buf += want;
295       size -= want;
296     }
297     else
298     {
299       /* need to copy to private buffer to align;
300        * yes, we go a bit more spagetti than usual here */
301       goto do_align;
302     }
303   }
304 copy:
305   if ((size > 0) && (!purge))
306   {
307     if (size + mst->pos > mst->curr_buf)
308     {
309       mst->hdr = GNUNET_realloc (mst->hdr,
310                                  size + mst->pos);
311       ibuf = (char *) mst->hdr;
312       mst->curr_buf = size + mst->pos;
313     }
314     GNUNET_assert (size + mst->pos <= mst->curr_buf);
315     GNUNET_memcpy (&ibuf[mst->pos],
316                    buf,
317                    size);
318     mst->pos += size;
319   }
320   if (purge)
321   {
322     mst->off = 0;
323     mst->pos = 0;
324   }
325   LOG (GNUNET_ERROR_TYPE_DEBUG,
326        "Server-mst leaves %u bytes in private buffer\n",
327        (unsigned int) (mst->pos - mst->off));
328   return ret;
329 }
330
331
332 /**
333  * Add incoming data to the receive buffer and call the
334  * callback for all complete messages.
335  *
336  * @param mst tokenizer to use
337  * @param buf input data to add
338  * @param size number of bytes in @a buf
339  * @param purge should any excess bytes in the buffer be discarded
340  *       (i.e. for packet-based services like UDP)
341  * @param one_shot only call callback once, keep rest of message in buffer
342  * @return #GNUNET_OK if we are done processing (need more data)
343  *         #GNUNET_NO if one_shot was set and we have another message ready
344  *         #GNUNET_SYSERR if the data stream is corrupt
345  */
346 int
347 GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
348                  struct GNUNET_NETWORK_Handle *sock,
349                  int purge,
350                  int one_shot)
351 {
352   ssize_t ret;
353   size_t left;
354   char *buf;
355
356   left = mst->curr_buf - mst->pos;
357   buf = (char *) mst->hdr;
358   ret = GNUNET_NETWORK_socket_recv (sock,
359                                     &buf[mst->pos],
360                                     left);
361   if (-1 == ret)
362   {
363     if ( (EAGAIN == errno) ||
364          (EINTR == errno) )
365       return GNUNET_OK;
366     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
367                          "recv");
368     return GNUNET_SYSERR;
369   }
370   if (0 == ret)
371   {
372     /* other side closed connection, treat as error */
373     return GNUNET_SYSERR;
374   }
375   mst->pos += ret;
376   return GNUNET_MST_from_buffer (mst,
377                                  NULL,
378                                  0,
379                                  purge,
380                                  one_shot);
381 }
382
383
384 /**
385  * Obtain the next message from the @a mst, assuming that
386  * there are more unprocessed messages in the internal buffer
387  * of the @a mst.
388  *
389  * @param mst tokenizer to use
390  * @param one_shot only call callback once, keep rest of message in buffer
391  * @return #GNUNET_OK if we are done processing (need more data)
392  *         #GNUNET_NO if one_shot was set and we have another message ready
393  *         #GNUNET_SYSERR if the data stream is corrupt
394  */
395 int
396 GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
397                  int one_shot)
398 {
399   return GNUNET_MST_from_buffer (mst,
400                                  NULL,
401                                  0,
402                                  GNUNET_NO,
403                                  one_shot);
404 }
405
406
407 /**
408  * Destroys a tokenizer.
409  *
410  * @param mst tokenizer to destroy
411  */
412 void
413 GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst)
414 {
415   GNUNET_free (mst->hdr);
416   GNUNET_free (mst);
417 }
418
419
420
421 /* end of server_mst.c */