// returns false it has either deleted the connection or marked it for deletion; we
// shouldn't touch instance members after that point.
- int pktType = iobuf[0];
+ int pktType = rbuf[0];
if (pktType == DINIT_CP_QUERYVERSION) {
// Responds with:
// DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
if (! queuePacket(replyBuf, 1)) return;
- memmove(iobuf, iobuf + 1, 1024 - 1);
- bufidx -= 1;
+ rbuf.consume(1);
return;
}
if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
}
// Clear the packet from the buffer
- memmove(iobuf, iobuf + 1, 1024 - 1);
- bufidx -= 1;
+ rbuf.consume(1);
chklen = 0;
return;
}
constexpr int pkt_size = 4;
- if (bufidx < pkt_size) {
+ if (rbuf.get_length() < pkt_size) {
chklen = pkt_size;
return;
}
uint16_t svcSize;
- memcpy(&svcSize, iobuf + 1, 2);
+ rbuf.extract((char *)&svcSize, 1, 2);
chklen = svcSize + 3;
if (svcSize <= 0 || chklen > 1024) {
// Queue error response / mark connection bad
return;
}
- if (bufidx < chklen) {
+ if (rbuf.get_length() < chklen) {
// packet not complete yet; read more
return;
}
ServiceRecord * record = nullptr;
- string serviceName(iobuf + 3, (size_t) svcSize);
+ string serviceName = std::move(rbuf.extract_string(3, svcSize));
+
if (pktType == DINIT_CP_LOADSERVICE) {
// LOADSERVICE
try {
}
// Clear the packet from the buffer
- memmove(iobuf, iobuf + chklen, 1024 - chklen);
- bufidx -= chklen;
+ rbuf.consume(chklen);
chklen = 0;
return;
}
constexpr int pkt_size = 2 + sizeof(handle_t);
- if (bufidx < pkt_size) {
+ if (rbuf.get_length() < pkt_size) {
chklen = pkt_size;
return;
}
// 1 byte: pin in requested state (0 = no pin, 1 = pin)
// 4 bytes: service handle
- bool do_pin = (iobuf[1] == 1);
+ bool do_pin = (rbuf[1] == 1);
handle_t handle;
- memcpy(&handle, iobuf + 2, sizeof(handle));
+ rbuf.extract((char *) &handle, 2, sizeof(handle));
ServiceRecord *service = findServiceForKey(handle);
if (service == nullptr) {
}
// Clear the packet from the buffer
- memmove(iobuf, iobuf + pkt_size, 1024 - pkt_size);
- bufidx -= pkt_size;
+ rbuf.consume(pkt_size);
chklen = 0;
return;
}
bool ControlConn::dataReady() noexcept
{
int fd = iob.fd;
- int buffree = 1024 - bufidx;
- int r = read(fd, iobuf + bufidx, buffree);
+ int r = rbuf.fill(fd);
// Note file descriptor is non-blocking
if (r == -1) {
return true;
}
- bufidx += r;
- buffree -= r;
-
// complete packet?
- if (bufidx >= chklen) {
+ if (rbuf.get_length() >= chklen) {
try {
processPacket();
}
}
}
- if (bufidx == 1024) {
+ if (rbuf.get_length() == 1024) {
// Too big packet
// TODO log error?
// TODO error response?
{
close(iob.fd);
ev_io_stop(loop, &iob);
- delete [] iobuf;
// Clear service listeners
for (auto p : serviceKeyMap) {
#include <unistd.h>
#include <ev++.h>
+
#include "dinit-log.h"
#include "control-cmds.h"
#include "service-listener.h"
+#include "cpbuffer.h"
// Control connection for dinit
// processPacket() will not be called until the packet reaches this size.
int chklen;
- char * iobuf;
- int bufidx;
+ // Receive buffer
+ CPBuffer rbuf;
template <typename T> using list = std::list<T>;
template <typename T> using vector = std::vector<T>;
// std::bad_alloc - if an out-of-memory condition prevents processing
void processPacket();
+ // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
void processStartStop(int pktType);
+ // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
void processFindLoad(int pktType);
// Notify that data is ready to be read from the socket. Returns true in cases where the
}
public:
- ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0), bufidx(0)
+ ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
{
- iobuf = new char[1024];
-
ev_io_init(&iob, control_conn_cb, fd, EV_READ);
iob.data = this;
ev_io_start(loop, &iob);
--- /dev/null
+#ifndef CPBUFFER_H
+#define CPBUFFER_H
+
+#include <cstring>
+
+// control protocol buffer, a circular buffer with 1024-byte capacity.
+class CPBuffer
+{
+ char buf[1024];
+ int cur_idx = 0;
+ int length = 0; // number of elements in the buffer
+
+ public:
+ int get_length() noexcept
+ {
+ return length;
+ }
+
+ // fill by reading from the given fd, return positive if some was read or -1 on error.
+ int fill(int fd) noexcept
+ {
+ int pos = cur_idx + length;
+ if (pos >= 1024) pos -= 1024;
+ int max_count = std::min(1024 - pos, 1024 - length);
+ ssize_t r = read(fd, buf + cur_idx, max_count);
+ if (r >= 0) {
+ length += r;
+ }
+ return r;
+ }
+
+ // fill by readin from the given fd, until at least the specified number of bytes are in
+ // the buffer. Return 0 if end-of-file reached before fill complete, or -1 on error.
+ int fillTo(int fd, int rlength) noexcept
+ {
+ while (length < rlength) {
+ int r = fill(fd);
+ if (r <= 0) return r;
+ }
+ return 1;
+ }
+
+ int operator[](int idx) noexcept
+ {
+ int dest_idx = cur_idx + idx;
+ if (dest_idx > 1024) dest_idx -= 1024;
+ return buf[dest_idx];
+ }
+
+ void consume(int amount) noexcept
+ {
+ cur_idx += amount;
+ if (cur_idx >= 1024) cur_idx -= 1024;
+ length -= amount;
+ }
+
+ void extract(char *dest, int index, int length) noexcept
+ {
+ index += cur_idx;
+ if (index >= 1024) index -= 1024;
+ if (index + length > 1024) {
+ // wrap-around copy
+ int half = 1024 - index;
+ std::memcpy(dest, buf + index, half);
+ std::memcpy(dest + half, buf, length - half);
+ }
+ else {
+ std::memcpy(dest, buf + index, length);
+ }
+ }
+
+ // Extract string of give length from given index
+ // Throws: std::bad_alloc on allocation failure
+ std::string extract_string(int index, int length)
+ {
+ index += cur_idx;
+ if (index >= 1024) index -= 1024;
+ if (index + length > 1024) {
+ std::string r(buf + index, 1024 - index);
+ r.insert(r.end(), buf, buf + length - (1024 - index));
+ return r;
+ }
+ else {
+ return std::string(buf + index, length);
+ }
+ }
+};
+
+#endif