Control stream: Use a circular receive buffer
authorDavin McCall <davmac@davmac.org>
Tue, 29 Dec 2015 12:14:37 +0000 (12:14 +0000)
committerDavin McCall <davmac@davmac.org>
Tue, 29 Dec 2015 12:14:37 +0000 (12:14 +0000)
control.cc
control.h
cpbuffer.h [new file with mode: 0644]

index 30ce37f3c1c250830c054c5df1b491cc43e26c42..cfe029e9cc8b7f25f5e37696cfad3e9e04c4a162 100644 (file)
@@ -9,14 +9,13 @@ void ControlConn::processPacket()
     // returns false it has either deleted the connection or marked it for deletion; we
     // shouldn't touch instance members after that point.
 
-    int pktType = iobuf[0];
+    int pktType = rbuf[0];
     if (pktType == DINIT_CP_QUERYVERSION) {
         // Responds with:
         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
         if (! queuePacket(replyBuf, 1)) return;
-        memmove(iobuf, iobuf + 1, 1024 - 1);
-        bufidx -= 1;
+        rbuf.consume(1);
         return;
     }
     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
@@ -41,8 +40,7 @@ void ControlConn::processPacket()
         }
         
         // Clear the packet from the buffer
-        memmove(iobuf, iobuf + 1, 1024 - 1);
-        bufidx -= 1;
+        rbuf.consume(1);
         chklen = 0;
         return;
     }
@@ -62,13 +60,13 @@ void ControlConn::processFindLoad(int pktType)
     
     constexpr int pkt_size = 4;
     
-    if (bufidx < pkt_size) {
+    if (rbuf.get_length() < pkt_size) {
         chklen = pkt_size;
         return;
     }
     
     uint16_t svcSize;
-    memcpy(&svcSize, iobuf + 1, 2);
+    rbuf.extract((char *)&svcSize, 1, 2);
     chklen = svcSize + 3;
     if (svcSize <= 0 || chklen > 1024) {
         // Queue error response / mark connection bad
@@ -79,14 +77,15 @@ void ControlConn::processFindLoad(int pktType)
         return;
     }
     
-    if (bufidx < chklen) {
+    if (rbuf.get_length() < chklen) {
         // packet not complete yet; read more
         return;
     }
     
     ServiceRecord * record = nullptr;
     
-    string serviceName(iobuf + 3, (size_t) svcSize);
+    string serviceName = std::move(rbuf.extract_string(3, svcSize));
+    
     if (pktType == DINIT_CP_LOADSERVICE) {
         // LOADSERVICE
         try {
@@ -119,8 +118,7 @@ void ControlConn::processFindLoad(int pktType)
     }
     
     // Clear the packet from the buffer
-    memmove(iobuf, iobuf + chklen, 1024 - chklen);
-    bufidx -= chklen;
+    rbuf.consume(chklen);
     chklen = 0;
     return;
 }
@@ -131,7 +129,7 @@ void ControlConn::processStartStop(int pktType)
     
     constexpr int pkt_size = 2 + sizeof(handle_t);
     
-    if (bufidx < pkt_size) {
+    if (rbuf.get_length() < pkt_size) {
         chklen = pkt_size;
         return;
     }
@@ -140,9 +138,9 @@ void ControlConn::processStartStop(int pktType)
     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
     // 4 bytes: service handle
     
-    bool do_pin = (iobuf[1] == 1);
+    bool do_pin = (rbuf[1] == 1);
     handle_t handle;
-    memcpy(&handle, iobuf + 2, sizeof(handle));
+    rbuf.extract((char *) &handle, 2, sizeof(handle));
     
     ServiceRecord *service = findServiceForKey(handle);
     if (service == nullptr) {
@@ -176,8 +174,7 @@ void ControlConn::processStartStop(int pktType)
     }
     
     // Clear the packet from the buffer
-    memmove(iobuf, iobuf + pkt_size, 1024 - pkt_size);
-    bufidx -= pkt_size;
+    rbuf.consume(pkt_size);
     chklen = 0;
     return;
 }
@@ -321,9 +318,8 @@ bool ControlConn::rollbackComplete() noexcept
 bool ControlConn::dataReady() noexcept
 {
     int fd = iob.fd;
-    int buffree = 1024 - bufidx;
     
-    int r = read(fd, iobuf + bufidx, buffree);
+    int r = rbuf.fill(fd);
     
     // Note file descriptor is non-blocking
     if (r == -1) {
@@ -340,11 +336,8 @@ bool ControlConn::dataReady() noexcept
         return true;
     }
     
-    bufidx += r;
-    buffree -= r;
-    
     // complete packet?
-    if (bufidx >= chklen) {
+    if (rbuf.get_length() >= chklen) {
         try {
             processPacket();
         }
@@ -353,7 +346,7 @@ bool ControlConn::dataReady() noexcept
         }
     }
     
-    if (bufidx == 1024) {
+    if (rbuf.get_length() == 1024) {
         // Too big packet
         // TODO log error?
         // TODO error response?
@@ -414,7 +407,6 @@ ControlConn::~ControlConn() noexcept
 {
     close(iob.fd);
     ev_io_stop(loop, &iob);
-    delete [] iobuf;
     
     // Clear service listeners
     for (auto p : serviceKeyMap) {
index 2b123de22bbb9d2e53eb7348545de8f529b9ab6b..0bce51c038215274c26dc979ea2319b6ad211b42 100644 (file)
--- a/control.h
+++ b/control.h
@@ -8,9 +8,11 @@
 
 #include <unistd.h>
 #include <ev++.h>
+
 #include "dinit-log.h"
 #include "control-cmds.h"
 #include "service-listener.h"
+#include "cpbuffer.h"
 
 // Control connection for dinit
 
@@ -57,8 +59,8 @@ class ControlConn : private ServiceListener
     // processPacket() will not be called until the packet reaches this size.
     int chklen;
     
-    char * iobuf;
-    int bufidx;
+    // Receive buffer
+    CPBuffer rbuf;
     
     template <typename T> using list = std::list<T>;
     template <typename T> using vector = std::vector<T>;
@@ -87,8 +89,10 @@ class ControlConn : private ServiceListener
     //    std::bad_alloc - if an out-of-memory condition prevents processing
     void processPacket();
     
+    // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
     void processStartStop(int pktType);
     
+    // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
     void processFindLoad(int pktType);
 
     // Notify that data is ready to be read from the socket. Returns true in cases where the
@@ -148,10 +152,8 @@ class ControlConn : private ServiceListener
     }
     
     public:
-    ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0), bufidx(0)
+    ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
     {
-        iobuf = new char[1024];
-    
         ev_io_init(&iob, control_conn_cb, fd, EV_READ);
         iob.data = this;
         ev_io_start(loop, &iob);
diff --git a/cpbuffer.h b/cpbuffer.h
new file mode 100644 (file)
index 0000000..251ef6a
--- /dev/null
@@ -0,0 +1,89 @@
+#ifndef CPBUFFER_H
+#define CPBUFFER_H
+
+#include <cstring>
+
+// control protocol buffer, a circular buffer with 1024-byte capacity.
+class CPBuffer
+{
+    char buf[1024];
+    int cur_idx = 0;
+    int length = 0;  // number of elements in the buffer
+    
+    public:
+    int get_length() noexcept
+    {
+        return length;
+    }
+    
+    // fill by reading from the given fd, return positive if some was read or -1 on error.
+    int fill(int fd) noexcept
+    {
+        int pos = cur_idx + length;
+        if (pos >= 1024) pos -= 1024;
+        int max_count = std::min(1024 - pos, 1024 - length);
+        ssize_t r = read(fd, buf + cur_idx, max_count);
+        if (r >= 0) {
+            length += r;
+        }
+        return r;
+    }
+    
+    // fill by readin from the given fd, until at least the specified number of bytes are in
+    // the buffer. Return 0 if end-of-file reached before fill complete, or -1 on error.
+    int fillTo(int fd, int rlength) noexcept
+    {
+        while (length < rlength) {
+            int r = fill(fd);
+            if (r <= 0) return r;
+        }
+        return 1;
+    }
+    
+    int operator[](int idx) noexcept
+    {
+        int dest_idx = cur_idx + idx;
+        if (dest_idx > 1024) dest_idx -= 1024;
+        return buf[dest_idx];
+    }
+    
+    void consume(int amount) noexcept
+    {
+        cur_idx += amount;
+        if (cur_idx >= 1024) cur_idx -= 1024;
+        length -= amount;
+    }
+    
+    void extract(char *dest, int index, int length) noexcept
+    {
+        index += cur_idx;
+        if (index >= 1024) index -= 1024;
+        if (index + length > 1024) {
+            // wrap-around copy
+            int half = 1024 - index;
+            std::memcpy(dest, buf + index, half);
+            std::memcpy(dest + half, buf, length - half);
+        }
+        else {
+            std::memcpy(dest, buf + index, length);
+        }
+    }
+    
+    // Extract string of give length from given index
+    // Throws:  std::bad_alloc on allocation failure
+    std::string extract_string(int index, int length)
+    {
+        index += cur_idx;
+        if (index >= 1024) index -= 1024;
+        if (index + length > 1024) {
+            std::string r(buf + index, 1024 - index);
+            r.insert(r.end(), buf, buf + length - (1024 - index));
+            return r;
+        }
+        else {
+            return std::string(buf + index, length);
+        }
+    }
+};
+
+#endif