From 6bf91a087e1a14290065831ced90fc6d1db060cf Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Sat, 21 Nov 2015 11:29:35 +0000 Subject: [PATCH] Add function to queue outgoing packet to a control connection. --- control.cc | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++---- control.h | 31 +++++++++++++++-- 2 files changed, 121 insertions(+), 8 deletions(-) diff --git a/control.cc b/control.cc index eefca49..e34f008 100644 --- a/control.cc +++ b/control.cc @@ -1,6 +1,11 @@ #include "control.h" #include "service.h" +// TODO at the moment we rely on the readiness notification to send "bad packet" responses. +// It would probably be better to, if the outgoing buffer is empty, try and send the packet +// immediately and only queue it if necessary. However this means we would potentially +// delete 'this' object which needs to be accounted for in calling methods. (Might be better +// to return a bool indicating that delete is required). void ControlConn::processPacket() { using std::string; @@ -15,15 +20,17 @@ void ControlConn::processPacket() uint16_t svcSize; memcpy(&svcSize, iobuf + 1, 2); if (svcSize <= 0) { - // TODO error response - bufidx = 1024; // dataReady will delete - TODO clean up + // TODO queue error response + bad_conn_close = true; + ev_io_set(&iob, iob.fd, EV_WRITE); } chklen = svcSize + 3; if (chklen > 1024) { // We can't have a service name this long // TODO error response - bufidx = 1024; // TODO cleanup. + bad_conn_close = true; + ev_io_set(&iob, iob.fd, EV_WRITE); } if (bufidx < chklen) { @@ -72,16 +79,42 @@ void ControlConn::processPacket() } } +bool ControlConn::queuePacket(std::vector &&pkt) noexcept +{ + bool was_empty = outbuf.empty(); + try { + outbuf.emplace_back(pkt); + if (was_empty) { + ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE); + } + return true; + } + catch (std::bad_alloc &baexc) { + // Mark the connection bad, and stop reading further requests + bad_conn_close = true; + oom_close = true; + if (was_empty) { + // TODO send out-of-memory response + delete this; + } + else { + ev_io_set(&iob, iob.fd, EV_WRITE); + } + return false; + } +} + void ControlConn::rollbackComplete() noexcept { char ackBuf[1] = { DINIT_RP_COMPLETED }; + // TODO Queue response instead of trying to write it directly like this if (write(iob.fd, ackBuf, 1) == -1) { - // TODO queue or at least re-try if error or 0 bytes written. log(LogLevel::ERROR, "Couldn't write response to control socket"); + delete this; } } -void ControlConn::dataReady() +void ControlConn::dataReady() noexcept { int fd = iob.fd; int buffree = 1024 - bufidx; @@ -108,14 +141,67 @@ void ControlConn::dataReady() // complete packet? if (bufidx >= chklen) { - processPacket(); + try { + processPacket(); + } + catch (std::bad_alloc &baexc) { + // TODO + } } if (bufidx == 1024) { // Too big packet // TODO log error? // TODO error response? + bad_conn_close = true; + ev_io_set(&iob, iob.fd, EV_WRITE); + } +} + +void ControlConn::sendData() noexcept +{ + if (outbuf.empty() && bad_conn_close) { + if (oom_close) { + // TODO send oom response + } delete this; + return; + } + + vector & pkt = outbuf.front(); + char *data = pkt.data(); + int written = write(iob.fd, data + outpkt_index, pkt.size() - outpkt_index); + if (written == -1) { + if (errno == EPIPE) { + // read end closed + delete this; + } + else if (errno == EAGAIN || errno == EWOULDBLOCK) { + // spurious readiness notification? + } + else { + // TODO log error + delete this; + } + return; + } + + outpkt_index += written; + if (outpkt_index == pkt.size()) { + // We've finished this packet, move on to the next: + outbuf.pop_front(); + outpkt_index = 0; + if (outbuf.empty()) { + if (! bad_conn_close) { + ev_io_set(&iob, iob.fd, EV_READ); + } + else { + if (oom_close) { + // TODO send out-of-memory reply if possible + } + delete this; + } + } } } diff --git a/control.h b/control.h index 90c5f0d..b5719a8 100644 --- a/control.h +++ b/control.h @@ -1,6 +1,9 @@ #ifndef DINIT_CONTROL_H #define DINIT_CONTROL_H +#include +#include + #include #include #include "dinit-log.h" @@ -37,9 +40,25 @@ class ControlConn char * iobuf; int bufidx; + bool bad_conn_close; // close when finished output? + bool oom_close; // send final 'out of memory' indicator + + template using list = std::list; + template using vector = std::vector; + + // Buffer for outgoing packets. Each outgoing back is represented as a vector. + list> outbuf; + unsigned outpkt_index = 0; + // The packet length before we need to re-check if the packet is complete int chklen; + // Queue a packet to be sent + // Returns: true if the packet was successfully queued, false if otherwise + // (eg if out of memory); in the latter case the connection might + // no longer be valid + bool queuePacket(vector &&v) noexcept; + public: ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), bufidx(0), chklen(0) { @@ -54,7 +73,9 @@ class ControlConn void processPacket(); void rollbackComplete() noexcept; - void dataReady(); + void dataReady() noexcept; + void sendData() noexcept; + ~ControlConn() noexcept; }; @@ -63,7 +84,13 @@ class ControlConn static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents) { ControlConn *conn = (ControlConn *) w->data; - conn->dataReady(); + if (revents & EV_READ) { + conn->dataReady(); + } + // TODO issu ehere: what if above deletes the connection? + if (revents & EV_WRITE) { + conn->sendData(); + } } #endif -- 2.25.1