mst fixes
[oweals/gnunet.git] / src / util / server_mst.c
index 53cae7035577c79d95db697b143de5c3ea61f44f..27c95815a40b53c088b2dab534e7830203ff8c39 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2010 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 #include "gnunet_server_lib.h"
 #include "gnunet_time_lib.h"
 
+#if HAVE_UNALIGNED_64_ACCESS
+#define ALIGN_FACTOR 4
+#else
+#define ALIGN_FACTOR 8
+#endif
+
 
 /**
  * Handle to a message stream tokenizer.
 struct GNUNET_SERVER_MessageStreamTokenizer
 {
 
-  size_t maxbuf;
-
-  size_t off;
+  /**
+   * Function to call on completed messages.
+   */
+  GNUNET_SERVER_MessageTokenizerCallback cb;
+  
+  /**
+   * Closure for cb.
+   */
+  void *cb_cls;
 
+  /**
+   * Client to pass to cb.
+   */
   void *client_identity;
 
-  GNUNET_SERVER_MessageTokenizerCallback cb;
+  /**
+   * Size of the buffer (starting at 'hdr').
+   */
+  size_t maxbuf;
 
-  void *cb_cls;
+  /**
+   * How many bytes in buffer have we already processed?
+   */
+  size_t off;
+
+  /**
+   * How many bytes in buffer are valid right now?
+   */
+  size_t pos;
 
   /**
-   * Beginning of the buffer.
+   * Beginning of the buffer.  Typed like this to force alignment.
    */
   struct GNUNET_MessageHeader hdr;
 
@@ -64,6 +90,8 @@ struct GNUNET_SERVER_MessageStreamTokenizer
  *    GNUNET_SERVER_MAX_MESSAGE_SIZE)
  * @param client_identity ID of client for which this is a buffer,
  *        can be NULL (will be passed back to 'cb')
+ * @param cb function to call on completed messages
+ * @param cb_cls closure for cb
  * @return handle to tokenizer
  */
 struct GNUNET_SERVER_MessageStreamTokenizer *
@@ -92,107 +120,158 @@ GNUNET_SERVER_mst_create (size_t maxbuf,
  * @param size number of bytes in buf
  * @param purge should any excess bytes in the buffer be discarded 
  *       (i.e. for packet-based services like UDP)
- * @return GNUNET_NO if the data stream is corrupt 
- *         GNUNET_SYSERR if the data stream is corrupt beyond repair
+ * @param one_shot only call callback once, keep rest of message in buffer
+ * @return GNUNET_OK if we are done processing (need more data)
+ *         GNUNET_NO if one_shot was set and we have another message ready
+ *         GNUNET_SYSERR if the data stream is corrupt
  */
 int
 GNUNET_SERVER_mst_receive (struct GNUNET_SERVER_MessageStreamTokenizer *mst,
                           const char *buf,
                           size_t size,
-                          int purge)
+                          int purge,
+                          int one_shot)
 {
   const struct GNUNET_MessageHeader *hdr;
   size_t delta;
-  size_t want;
+  uint16_t want;
   char *ibuf;
   int need_align;
   unsigned long offset;
+  int ret;
 
+  ret = GNUNET_OK;
   ibuf = (char*) &mst->hdr;
-  if (mst->off > 0)
+  if (mst->pos > 0)
     {
     do_align:
-      if (mst->off < sizeof (struct GNUNET_MessageHeader))
+      if ( (mst->maxbuf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
+          (0 != (mst->off % ALIGN_FACTOR)) )
+       {
+         /* need to align or need more space */
+         mst->pos -= mst->off;
+         memmove (ibuf,
+                  &ibuf[mst->off],
+                  mst->pos);
+         mst->off = 0;
+       }
+      if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
        {
-         delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - mst->off,
+         delta = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader) - (mst->pos - mst->off),
                              size);
-         memcpy (&ibuf[mst->off],
+         memcpy (&ibuf[mst->pos],
                  buf,
                  delta);
-         mst->off += delta;
+         mst->pos += delta;
          buf += delta;
          size -= delta;
        }
-      if (mst->off < sizeof (struct GNUNET_MessageHeader))
+      if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
        {
          if (purge)
-           mst->off = 0;    
+           {
+             mst->off = 0;    
+             mst->pos = 0;
+           }
          return GNUNET_OK;
        }
-      want = ntohs (mst->hdr.size);
+      hdr = (const struct GNUNET_MessageHeader*) &ibuf[mst->off];
+      want = ntohs (hdr->size);
       if (want < sizeof (struct GNUNET_MessageHeader))
        {
          GNUNET_break_op (0);
-         if (purge)
-           return GNUNET_NO;
          return GNUNET_SYSERR;
        }
-      if (want < mst->off)
+      if (mst->maxbuf - mst->off < want)
        {
-         delta = GNUNET_MIN (want - mst->off,
+         /* need more space */
+         mst->pos -= mst->off;
+         memmove (ibuf,
+                  &ibuf[mst->off],
+                  mst->pos);
+         mst->off = 0;
+       }
+      if (mst->pos - mst->off < want)
+       {
+         delta = GNUNET_MIN (want - (mst->pos - mst->off),
                              size);
-         memcpy (&ibuf[mst->off],
+         memcpy (&ibuf[mst->pos],
                  buf,
                  delta);
-         mst->off += delta;
+         mst->pos += delta;
          buf += delta;
          size -= delta;
        }
-      if (want < mst->off)
+      if (mst->pos - mst->off < want)
        {
          if (purge)
-           mst->off = 0;    
+           {
+             mst->off = 0;    
+             mst->pos = 0;
+           }
          return GNUNET_OK;
        }
-      mst->cb (mst->cb_cls, mst->client_identity, &mst->hdr);
-      mst->off = 0;
+      if (one_shot == GNUNET_SYSERR)
+       {
+         /* cannot call callback again, but return value saying that
+            we have another full message in the buffer */
+         ret = GNUNET_NO;
+         goto copy;
+       }
+      if (one_shot == GNUNET_YES)
+       one_shot = GNUNET_SYSERR;
+      mst->cb (mst->cb_cls, mst->client_identity, hdr);
+      mst->off += want;
+      if (mst->off == mst->pos)
+       {
+         /* reset to beginning of buffer, it's free right now! */
+         mst->off = 0;
+         mst->pos = 0;
+       }
     }
   while (size > 0)
     {
       if (size < sizeof (struct GNUNET_MessageHeader))
        break;
       offset = (unsigned long) buf;
-#if HAVE_UNALIGNED_64_ACCESS
-      need_align = (0 != offset % 4) ? GNUNET_YES : GNUNET_NO;
-#else
-      need_align = (0 != offset % 8) ? GNUNET_YES : GNUNET_NO;
-#endif
+      need_align = (0 != offset % ALIGN_FACTOR) ? GNUNET_YES : GNUNET_NO;
       if (GNUNET_NO == need_align)
        {
-         /* can try to do zero-copy */
+         /* can try to do zero-copy and process directly from original buffer */
          hdr = (const struct GNUNET_MessageHeader *) buf;
          want = ntohs (hdr->size);
          if (size < want)
-           break; /* or not, buffer incomplete... */
+           break; /* or not, buffer incomplete, so copy to private buffer... */
+         if (one_shot == GNUNET_SYSERR)
+           {
+             /* cannot call callback again, but return value saying that
+                we have another full message in the buffer */
+             ret = GNUNET_NO;
+             goto copy;
+           }
+         if (one_shot == GNUNET_YES)
+           one_shot = GNUNET_SYSERR;
          mst->cb (mst->cb_cls, mst->client_identity, hdr);
          buf += want;
          size -= want;
        }
       else
        {
-         /* yes, we go a bit more spagetti than usual here */
+         /* need to copy to private buffer to align;
+            yes, we go a bit more spagetti than usual here */
          goto do_align;
        }
     }
+ copy:
   if ( (size > 0) && (! purge) )
     {
-      memcpy (&mst->hdr, buf, size);
-      mst->off = size;
-      size = 0;
+      GNUNET_assert (mst->pos + size <= mst->maxbuf);
+      memcpy (&ibuf[mst->pos], buf, size);
+      mst->pos += size;
     }
   if (purge)
     mst->off = 0;    
-  return GNUNET_OK;
+  return ret;
 }