minor style fix
[oweals/gnunet.git] / src / util / mst.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file util/mst.c
23  * @brief convenience functions for handling inbound message buffers
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29
30
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36
37 #define LOG(kind,...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38
39
40 /**
41  * Handle to a message stream tokenizer.
42  */
43 struct GNUNET_MessageStreamTokenizer
44 {
45
46   /**
47    * Function to call on completed messages.
48    */
49   GNUNET_MessageTokenizerCallback cb;
50
51   /**
52    * Closure for @e cb.
53    */
54   void *cb_cls;
55
56   /**
57    * Size of the buffer (starting at @e hdr).
58    */
59   size_t curr_buf;
60
61   /**
62    * How many bytes in buffer have we already processed?
63    */
64   size_t off;
65
66   /**
67    * How many bytes in buffer are valid right now?
68    */
69   size_t pos;
70
71   /**
72    * Beginning of the buffer.  Typed like this to force alignment.
73    */
74   struct GNUNET_MessageHeader *hdr;
75
76 };
77
78
79 /**
80  * Create a message stream tokenizer.
81  *
82  * @param cb function to call on completed messages
83  * @param cb_cls closure for @a cb
84  * @return handle to tokenizer
85  */
86 struct GNUNET_MessageStreamTokenizer *
87 GNUNET_MST_create (GNUNET_MessageTokenizerCallback cb,
88                    void *cb_cls)
89 {
90   struct GNUNET_MessageStreamTokenizer *ret;
91
92   ret = GNUNET_new (struct GNUNET_MessageStreamTokenizer);
93   ret->hdr = GNUNET_malloc (GNUNET_MIN_MESSAGE_SIZE);
94   ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
95   ret->cb = cb;
96   ret->cb_cls = cb_cls;
97   return ret;
98 }
99
100
101 /**
102  * Add incoming data to the receive buffer and call the
103  * callback for all complete messages.
104  *
105  * @param mst tokenizer to use
106  * @param buf input data to add
107  * @param size number of bytes in @a buf
108  * @param purge should any excess bytes in the buffer be discarded
109  *       (i.e. for packet-based services like UDP)
110  * @param one_shot only call callback once, keep rest of message in buffer
111  * @return #GNUNET_OK if we are done processing (need more data)
112  *         #GNUNET_NO if @a one_shot was set and we have another message ready
113  *         #GNUNET_SYSERR if the data stream is corrupt
114  */
115 int
116 GNUNET_MST_from_buffer (struct GNUNET_MessageStreamTokenizer *mst,
117                         const char *buf,
118                         size_t size,
119                         int purge,
120                         int one_shot)
121 {
122   const struct GNUNET_MessageHeader *hdr;
123   size_t delta;
124   uint16_t want;
125   char *ibuf;
126   int need_align;
127   unsigned long offset;
128   int ret;
129
130   GNUNET_assert (mst->off <= mst->pos);
131   GNUNET_assert (mst->pos <= mst->curr_buf);
132   LOG (GNUNET_ERROR_TYPE_DEBUG,
133        "MST receives %u bytes with %u bytes already in private buffer\n",
134        (unsigned int) size,
135        (unsigned int) (mst->pos - mst->off));
136   ret = GNUNET_OK;
137   ibuf = (char *) mst->hdr;
138   while (mst->pos > 0)
139   {
140 do_align:
141     GNUNET_assert (mst->pos >= mst->off);
142     if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
143         (0 != (mst->off % ALIGN_FACTOR)))
144     {
145       /* need to align or need more space */
146       mst->pos -= mst->off;
147       memmove (ibuf,
148                &ibuf[mst->off],
149                mst->pos);
150       mst->off = 0;
151     }
152     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
153     {
154       delta
155         = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
156                       - (mst->pos - mst->off),
157                       size);
158       GNUNET_memcpy (&ibuf[mst->pos],
159                      buf,
160                      delta);
161       mst->pos += delta;
162       buf += delta;
163       size -= delta;
164     }
165     if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
166     {
167       if (purge)
168       {
169         mst->off = 0;
170         mst->pos = 0;
171       }
172       return GNUNET_OK;
173     }
174     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
175     want = ntohs (hdr->size);
176     if (want < sizeof (struct GNUNET_MessageHeader))
177     {
178       GNUNET_break_op (0);
179       return GNUNET_SYSERR;
180     }
181     if ( (mst->curr_buf - mst->off < want) &&
182          (mst->off > 0) )
183     {
184       /* can get more space by moving */
185       mst->pos -= mst->off;
186       memmove (ibuf,
187                &ibuf[mst->off],
188                mst->pos);
189       mst->off = 0;
190     }
191     if (mst->curr_buf < want)
192     {
193       /* need to get more space by growing buffer */
194       GNUNET_assert (0 == mst->off);
195       mst->hdr = GNUNET_realloc (mst->hdr,
196                                  want);
197       ibuf = (char *) mst->hdr;
198       mst->curr_buf = want;
199     }
200     hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
201     if (mst->pos - mst->off < want)
202     {
203       delta = GNUNET_MIN (want - (mst->pos - mst->off),
204                           size);
205       GNUNET_assert (mst->pos + delta <= mst->curr_buf);
206       GNUNET_memcpy (&ibuf[mst->pos],
207                      buf,
208                      delta);
209       mst->pos += delta;
210       buf += delta;
211       size -= delta;
212     }
213     if (mst->pos - mst->off < want)
214     {
215       if (purge)
216       {
217         mst->off = 0;
218         mst->pos = 0;
219       }
220       return GNUNET_OK;
221     }
222     if (one_shot == GNUNET_SYSERR)
223     {
224       /* cannot call callback again, but return value saying that
225        * we have another full message in the buffer */
226       ret = GNUNET_NO;
227       goto copy;
228     }
229     if (one_shot == GNUNET_YES)
230       one_shot = GNUNET_SYSERR;
231     mst->off += want;
232     if (GNUNET_SYSERR == mst->cb (mst->cb_cls,
233                                   hdr))
234     {
235       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
236                   "Failure processing message of type %u and size %u\n",
237                   ntohs (hdr->type),
238                   ntohs (hdr->size));
239       return GNUNET_SYSERR;
240     }
241     if (mst->off == mst->pos)
242     {
243       /* reset to beginning of buffer, it's free right now! */
244       mst->off = 0;
245       mst->pos = 0;
246     }
247   }
248   GNUNET_assert (0 == mst->pos);
249   while (size > 0)
250   {
251     LOG (GNUNET_ERROR_TYPE_DEBUG,
252          "Server-mst has %u bytes left in inbound buffer\n",
253          (unsigned int) size);
254     if (size < sizeof (struct GNUNET_MessageHeader))
255       break;
256     offset = (unsigned long) buf;
257     need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
258     if (GNUNET_NO == need_align)
259     {
260       /* can try to do zero-copy and process directly from original buffer */
261       hdr = (const struct GNUNET_MessageHeader *) buf;
262       want = ntohs (hdr->size);
263       if (want < sizeof (struct GNUNET_MessageHeader))
264       {
265         GNUNET_break_op (0);
266         mst->off = 0;
267         return GNUNET_SYSERR;
268       }
269       if (size < want)
270         break;                  /* or not: buffer incomplete, so copy to private buffer... */
271       if (one_shot == GNUNET_SYSERR)
272       {
273         /* cannot call callback again, but return value saying that
274          * we have another full message in the buffer */
275         ret = GNUNET_NO;
276         goto copy;
277       }
278       if (one_shot == GNUNET_YES)
279         one_shot = GNUNET_SYSERR;
280       if (GNUNET_SYSERR == mst->cb (mst->cb_cls,
281                                     hdr))
282       {
283         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
284                     "Failure processing message of type %u and size %u\n",
285                     ntohs (hdr->type),
286                     ntohs (hdr->size));
287         return GNUNET_SYSERR;
288       }
289       buf += want;
290       size -= want;
291     }
292     else
293     {
294       /* need to copy to private buffer to align;
295        * yes, we go a bit more spagetti than usual here */
296       goto do_align;
297     }
298   }
299 copy:
300   if ((size > 0) && (!purge))
301   {
302     if (size + mst->pos > mst->curr_buf)
303     {
304       mst->hdr = GNUNET_realloc (mst->hdr,
305                                  size + mst->pos);
306       ibuf = (char *) mst->hdr;
307       mst->curr_buf = size + mst->pos;
308     }
309     GNUNET_assert (size + mst->pos <= mst->curr_buf);
310     GNUNET_memcpy (&ibuf[mst->pos],
311                    buf,
312                    size);
313     mst->pos += size;
314   }
315   if (purge)
316   {
317     mst->off = 0;
318     mst->pos = 0;
319   }
320   LOG (GNUNET_ERROR_TYPE_DEBUG,
321        "Server-mst leaves %u bytes in private buffer\n",
322        (unsigned int) (mst->pos - mst->off));
323   return ret;
324 }
325
326
327 /**
328  * Add incoming data to the receive buffer and call the
329  * callback for all complete messages.
330  *
331  * @param mst tokenizer to use
332  * @param buf input data to add
333  * @param size number of bytes in @a buf
334  * @param purge should any excess bytes in the buffer be discarded
335  *       (i.e. for packet-based services like UDP)
336  * @param one_shot only call callback once, keep rest of message in buffer
337  * @return #GNUNET_OK if we are done processing (need more data)
338  *         #GNUNET_NO if one_shot was set and we have another message ready
339  *         #GNUNET_SYSERR if the data stream is corrupt
340  */
341 int
342 GNUNET_MST_read (struct GNUNET_MessageStreamTokenizer *mst,
343                  struct GNUNET_NETWORK_Handle *sock,
344                  int purge,
345                  int one_shot)
346 {
347   ssize_t ret;
348   size_t left;
349   char *buf;
350
351   left = mst->curr_buf - mst->pos;
352   buf = (char *) mst->hdr;
353   ret = GNUNET_NETWORK_socket_recv (sock,
354                                     &buf[mst->pos],
355                                     left);
356   if (-1 == ret)
357   {
358     if ( (EAGAIN == errno) ||
359          (EINTR == errno) )
360       return GNUNET_OK;
361     GNUNET_log_strerror (GNUNET_ERROR_TYPE_INFO,
362                          "recv");
363     return GNUNET_SYSERR;
364   }
365   if (0 == ret)
366   {
367     /* other side closed connection, treat as error */
368     return GNUNET_SYSERR;
369   }
370   mst->pos += ret;
371   return GNUNET_MST_from_buffer (mst,
372                                  NULL,
373                                  0,
374                                  purge,
375                                  one_shot);
376 }
377
378
379 /**
380  * Obtain the next message from the @a mst, assuming that
381  * there are more unprocessed messages in the internal buffer
382  * of the @a mst.
383  *
384  * @param mst tokenizer to use
385  * @param one_shot only call callback once, keep rest of message in buffer
386  * @return #GNUNET_OK if we are done processing (need more data)
387  *         #GNUNET_NO if one_shot was set and we have another message ready
388  *         #GNUNET_SYSERR if the data stream is corrupt
389  */
390 int
391 GNUNET_MST_next (struct GNUNET_MessageStreamTokenizer *mst,
392                  int one_shot)
393 {
394   return GNUNET_MST_from_buffer (mst,
395                                  NULL,
396                                  0,
397                                  GNUNET_NO,
398                                  one_shot);
399 }
400
401
402 /**
403  * Destroys a tokenizer.
404  *
405  * @param mst tokenizer to destroy
406  */
407 void
408 GNUNET_MST_destroy (struct GNUNET_MessageStreamTokenizer *mst)
409 {
410   GNUNET_free (mst->hdr);
411   GNUNET_free (mst);
412 }
413
414
415
416 /* end of server_mst.c */