From 74333f0f17478a9a9024421cf1c5e0a664bedfb4 Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Tue, 29 Dec 2015 12:14:37 +0000 Subject: [PATCH] Control stream: Use a circular receive buffer --- control.cc | 40 ++++++++++-------------- control.h | 12 +++++--- cpbuffer.h | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 29 deletions(-) create mode 100644 cpbuffer.h diff --git a/control.cc b/control.cc index 30ce37f..cfe029e 100644 --- a/control.cc +++ b/control.cc @@ -9,14 +9,13 @@ void ControlConn::processPacket() // returns false it has either deleted the connection or marked it for deletion; we // shouldn't touch instance members after that point. - int pktType = iobuf[0]; + int pktType = rbuf[0]; if (pktType == DINIT_CP_QUERYVERSION) { // Responds with: // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 }; if (! queuePacket(replyBuf, 1)) return; - memmove(iobuf, iobuf + 1, 1024 - 1); - bufidx -= 1; + rbuf.consume(1); return; } if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) { @@ -41,8 +40,7 @@ void ControlConn::processPacket() } // Clear the packet from the buffer - memmove(iobuf, iobuf + 1, 1024 - 1); - bufidx -= 1; + rbuf.consume(1); chklen = 0; return; } @@ -62,13 +60,13 @@ void ControlConn::processFindLoad(int pktType) constexpr int pkt_size = 4; - if (bufidx < pkt_size) { + if (rbuf.get_length() < pkt_size) { chklen = pkt_size; return; } uint16_t svcSize; - memcpy(&svcSize, iobuf + 1, 2); + rbuf.extract((char *)&svcSize, 1, 2); chklen = svcSize + 3; if (svcSize <= 0 || chklen > 1024) { // Queue error response / mark connection bad @@ -79,14 +77,15 @@ void ControlConn::processFindLoad(int pktType) return; } - if (bufidx < chklen) { + if (rbuf.get_length() < chklen) { // packet not complete yet; read more return; } ServiceRecord * record = nullptr; - string serviceName(iobuf + 3, (size_t) svcSize); + string serviceName = std::move(rbuf.extract_string(3, svcSize)); + if (pktType == DINIT_CP_LOADSERVICE) { // LOADSERVICE try { @@ -119,8 +118,7 @@ void ControlConn::processFindLoad(int pktType) } // Clear the packet from the buffer - memmove(iobuf, iobuf + chklen, 1024 - chklen); - bufidx -= chklen; + rbuf.consume(chklen); chklen = 0; return; } @@ -131,7 +129,7 @@ void ControlConn::processStartStop(int pktType) constexpr int pkt_size = 2 + sizeof(handle_t); - if (bufidx < pkt_size) { + if (rbuf.get_length() < pkt_size) { chklen = pkt_size; return; } @@ -140,9 +138,9 @@ void ControlConn::processStartStop(int pktType) // 1 byte: pin in requested state (0 = no pin, 1 = pin) // 4 bytes: service handle - bool do_pin = (iobuf[1] == 1); + bool do_pin = (rbuf[1] == 1); handle_t handle; - memcpy(&handle, iobuf + 2, sizeof(handle)); + rbuf.extract((char *) &handle, 2, sizeof(handle)); ServiceRecord *service = findServiceForKey(handle); if (service == nullptr) { @@ -176,8 +174,7 @@ void ControlConn::processStartStop(int pktType) } // Clear the packet from the buffer - memmove(iobuf, iobuf + pkt_size, 1024 - pkt_size); - bufidx -= pkt_size; + rbuf.consume(pkt_size); chklen = 0; return; } @@ -321,9 +318,8 @@ bool ControlConn::rollbackComplete() noexcept bool ControlConn::dataReady() noexcept { int fd = iob.fd; - int buffree = 1024 - bufidx; - int r = read(fd, iobuf + bufidx, buffree); + int r = rbuf.fill(fd); // Note file descriptor is non-blocking if (r == -1) { @@ -340,11 +336,8 @@ bool ControlConn::dataReady() noexcept return true; } - bufidx += r; - buffree -= r; - // complete packet? - if (bufidx >= chklen) { + if (rbuf.get_length() >= chklen) { try { processPacket(); } @@ -353,7 +346,7 @@ bool ControlConn::dataReady() noexcept } } - if (bufidx == 1024) { + if (rbuf.get_length() == 1024) { // Too big packet // TODO log error? // TODO error response? @@ -414,7 +407,6 @@ ControlConn::~ControlConn() noexcept { close(iob.fd); ev_io_stop(loop, &iob); - delete [] iobuf; // Clear service listeners for (auto p : serviceKeyMap) { diff --git a/control.h b/control.h index 2b123de..0bce51c 100644 --- a/control.h +++ b/control.h @@ -8,9 +8,11 @@ #include #include + #include "dinit-log.h" #include "control-cmds.h" #include "service-listener.h" +#include "cpbuffer.h" // Control connection for dinit @@ -57,8 +59,8 @@ class ControlConn : private ServiceListener // processPacket() will not be called until the packet reaches this size. int chklen; - char * iobuf; - int bufidx; + // Receive buffer + CPBuffer rbuf; template using list = std::list; template using vector = std::vector; @@ -87,8 +89,10 @@ class ControlConn : private ServiceListener // std::bad_alloc - if an out-of-memory condition prevents processing void processPacket(); + // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc. void processStartStop(int pktType); + // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc. void processFindLoad(int pktType); // Notify that data is ready to be read from the socket. Returns true in cases where the @@ -148,10 +152,8 @@ class ControlConn : private ServiceListener } public: - ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0), bufidx(0) + ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0) { - iobuf = new char[1024]; - ev_io_init(&iob, control_conn_cb, fd, EV_READ); iob.data = this; ev_io_start(loop, &iob); diff --git a/cpbuffer.h b/cpbuffer.h new file mode 100644 index 0000000..251ef6a --- /dev/null +++ b/cpbuffer.h @@ -0,0 +1,89 @@ +#ifndef CPBUFFER_H +#define CPBUFFER_H + +#include + +// control protocol buffer, a circular buffer with 1024-byte capacity. +class CPBuffer +{ + char buf[1024]; + int cur_idx = 0; + int length = 0; // number of elements in the buffer + + public: + int get_length() noexcept + { + return length; + } + + // fill by reading from the given fd, return positive if some was read or -1 on error. + int fill(int fd) noexcept + { + int pos = cur_idx + length; + if (pos >= 1024) pos -= 1024; + int max_count = std::min(1024 - pos, 1024 - length); + ssize_t r = read(fd, buf + cur_idx, max_count); + if (r >= 0) { + length += r; + } + return r; + } + + // fill by readin from the given fd, until at least the specified number of bytes are in + // the buffer. Return 0 if end-of-file reached before fill complete, or -1 on error. + int fillTo(int fd, int rlength) noexcept + { + while (length < rlength) { + int r = fill(fd); + if (r <= 0) return r; + } + return 1; + } + + int operator[](int idx) noexcept + { + int dest_idx = cur_idx + idx; + if (dest_idx > 1024) dest_idx -= 1024; + return buf[dest_idx]; + } + + void consume(int amount) noexcept + { + cur_idx += amount; + if (cur_idx >= 1024) cur_idx -= 1024; + length -= amount; + } + + void extract(char *dest, int index, int length) noexcept + { + index += cur_idx; + if (index >= 1024) index -= 1024; + if (index + length > 1024) { + // wrap-around copy + int half = 1024 - index; + std::memcpy(dest, buf + index, half); + std::memcpy(dest + half, buf, length - half); + } + else { + std::memcpy(dest, buf + index, length); + } + } + + // Extract string of give length from given index + // Throws: std::bad_alloc on allocation failure + std::string extract_string(int index, int length) + { + index += cur_idx; + if (index >= 1024) index -= 1024; + if (index + length > 1024) { + std::string r(buf + index, 1024 - index); + r.insert(r.end(), buf, buf + length - (1024 - index)); + return r; + } + else { + return std::string(buf + index, length); + } + } +}; + +#endif -- 2.25.1