From ca0f9b4b25bf71b77379bc2a2ececa9c17177216 Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Mon, 13 Jun 2016 22:21:08 +0100 Subject: [PATCH] Rework control protocol handling a little. --- src/control.cc | 117 +++++++++++++++++++++++++++---------------------- src/control.h | 54 +++++++++++++---------- 2 files changed, 95 insertions(+), 76 deletions(-) diff --git a/src/control.cc b/src/control.cc index acd2aa3..5c44c11 100644 --- a/src/control.cc +++ b/src/control.cc @@ -1,7 +1,7 @@ #include "control.h" #include "service.h" -void ControlConn::processPacket() +bool ControlConn::processPacket() { using std::string; @@ -14,52 +14,49 @@ void ControlConn::processPacket() // Responds with: // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 }; - if (! queuePacket(replyBuf, 1)) return; + if (! queuePacket(replyBuf, 1)) return false; rbuf.consume(1); - return; + return true; } if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) { - processFindLoad(pktType); - return; + return processFindLoad(pktType); } if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) { - processStartStop(pktType); - return; + return processStartStop(pktType); } if (pktType == DINIT_CP_UNPINSERVICE) { - processUnpinService(); - return; + return processUnpinService(); } if (pktType == DINIT_CP_SHUTDOWN) { // Shutdown/reboot if (rbuf.get_length() < 2) { chklen = 2; - return; + return true; } auto sd_type = static_cast(rbuf[1]); service_set->stop_all_services(sd_type); char ackBuf[] = { DINIT_RP_ACK }; - if (! queuePacket(ackBuf, 1)) return; + if (! queuePacket(ackBuf, 1)) return false; // Clear the packet from the buffer rbuf.consume(2); chklen = 0; - return; + return true; } else { // Unrecognized: give error response char outbuf[] = { DINIT_RP_BADREQ }; - if (! queuePacket(outbuf, 1)) return; + if (! queuePacket(outbuf, 1)) return false; bad_conn_close = true; iob.setWatchFlags(out_events); } - return; + return true; } -void ControlConn::processFindLoad(int pktType) +bool ControlConn::processFindLoad(int pktType) { using std::string; @@ -67,7 +64,7 @@ void ControlConn::processFindLoad(int pktType) if (rbuf.get_length() < pkt_size) { chklen = pkt_size; - return; + return true; } uint16_t svcSize; @@ -76,15 +73,15 @@ void ControlConn::processFindLoad(int pktType) if (svcSize <= 0 || chklen > 1024) { // Queue error response / mark connection bad char badreqRep[] = { DINIT_RP_BADREQ }; - if (! queuePacket(badreqRep, 1)) return; + if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; iob.setWatchFlags(out_events); - return; + return true; } if (rbuf.get_length() < chklen) { // packet not complete yet; read more - return; + return true; } ServiceRecord * record = nullptr; @@ -116,20 +113,20 @@ void ControlConn::processFindLoad(int pktType) rp_buf.push_back(*(((char *) &handle) + i)); } rp_buf.push_back(static_cast(record->getTargetState())); - if (! queuePacket(std::move(rp_buf))) return; + if (! queuePacket(std::move(rp_buf))) return false; } else { std::vector rp_buf = { DINIT_RP_NOSERVICE }; - if (! queuePacket(std::move(rp_buf))) return; + if (! queuePacket(std::move(rp_buf))) return false; } // Clear the packet from the buffer rbuf.consume(chklen); chklen = 0; - return; + return true; } -void ControlConn::processStartStop(int pktType) +bool ControlConn::processStartStop(int pktType) { using std::string; @@ -137,7 +134,7 @@ void ControlConn::processStartStop(int pktType) if (rbuf.get_length() < pkt_size) { chklen = pkt_size; - return; + return true; } // 1 byte: packet type @@ -152,10 +149,10 @@ void ControlConn::processStartStop(int pktType) if (service == nullptr) { // Service handle is bad char badreqRep[] = { DINIT_RP_BADREQ }; - if (! queuePacket(badreqRep, 1)) return; + if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; iob.setWatchFlags(out_events); - return; + return true; } else { bool already_there = false; @@ -192,16 +189,16 @@ void ControlConn::processStartStop(int pktType) char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) }; - if (! queuePacket(ack_buf, 1)) return; + if (! queuePacket(ack_buf, 1)) return false; } // Clear the packet from the buffer rbuf.consume(pkt_size); chklen = 0; - return; + return true; } -void ControlConn::processUnpinService() +bool ControlConn::processUnpinService() { using std::string; @@ -209,7 +206,7 @@ void ControlConn::processUnpinService() if (rbuf.get_length() < pkt_size) { chklen = pkt_size; - return; + return true; } // 1 byte: packet type @@ -222,21 +219,21 @@ void ControlConn::processUnpinService() if (service == nullptr) { // Service handle is bad char badreqRep[] = { DINIT_RP_BADREQ }; - if (! queuePacket(badreqRep, 1)) return; + if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; iob.setWatchFlags(out_events); - return; + return true; } else { service->unpin(); char ack_buf[] = { (char) DINIT_RP_ACK }; - if (! queuePacket(ack_buf, 1)) return; + if (! queuePacket(ack_buf, 1)) return false; } // Clear the packet from the buffer rbuf.consume(pkt_size); chklen = 0; - return; + return true; } ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record) @@ -266,10 +263,11 @@ ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record) bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept { - if (bad_conn_close) return false; - + int in_flag = bad_conn_close ? 0 : in_events; bool was_empty = outbuf.empty(); + // If the queue is empty, we can try to write the packet out now rather than queueing it. + // If the write is unsuccessful or partial, we queue the remainder. if (was_empty) { int wr = write(iob.fd, pkt, size); if (wr == -1) { @@ -277,26 +275,28 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept delete this; return false; } - if (errno != EAGAIN && errno != EWOULDBLOCK) { + if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { // TODO log error delete this; return false; } + // EAGAIN etc: fall through to below } else { if ((unsigned)wr == size) { // Ok, all written. + iob.setWatchFlags(in_flag); return true; } pkt += wr; size -= wr; } - iob.setWatchFlags(in_events | out_events); } // Create a vector out of the (remaining part of the) packet: try { outbuf.emplace_back(pkt, pkt + size); + iob.setWatchFlags(in_flag | out_events); return true; } catch (std::bad_alloc &baexc) { @@ -308,19 +308,20 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept // could above. Neither can we later send the response since we have currently // sent an incomplete packet. All we can do is close the connection. delete this; + return false; } else { iob.setWatchFlags(out_events); + return true; } - return false; } } - +// This queuePacket method is frustratingly similar to the one above, but the subtle differences +// make them extraordinary difficult to combine into a single method. bool ControlConn::queuePacket(std::vector &&pkt) noexcept { - if (bad_conn_close) return false; - + int in_flag = bad_conn_close ? 0 : in_events; bool was_empty = outbuf.empty(); if (was_empty) { @@ -332,24 +333,26 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept delete this; return false; } - if (errno != EAGAIN && errno != EWOULDBLOCK) { + if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) { // TODO log error delete this; return false; } + // EAGAIN etc: fall through to below } else { if ((unsigned)wr == pkt.size()) { // Ok, all written. + iob.setWatchFlags(in_flag); return true; } outpkt_index = wr; } - iob.setWatchFlags(in_events | out_events); } try { outbuf.emplace_back(pkt); + iob.setWatchFlags(in_flag | out_events); return true; } catch (std::bad_alloc &baexc) { @@ -361,11 +364,12 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept // could above. Neither can we later send the response since we have currently // sent an incomplete packet. All we can do is close the connection. delete this; + return false; } else { iob.setWatchFlags(out_events); + return true; } - return false; } } @@ -399,25 +403,29 @@ bool ControlConn::dataReady() noexcept // complete packet? if (rbuf.get_length() >= chklen) { try { - processPacket(); + return processPacket(); } catch (std::bad_alloc &baexc) { doOomClose(); + return false; } } - - if (rbuf.get_length() == 1024) { + else if (rbuf.get_length() == 1024) { // Too big packet // TODO log error? // TODO error response? bad_conn_close = true; iob.setWatchFlags(out_events); } + else { + int out_flags = (bad_conn_close || !outbuf.empty()) ? out_events : 0; + iob.setWatchFlags(in_events | out_flags); + } return false; } -void ControlConn::sendData() noexcept +bool ControlConn::sendData() noexcept { if (outbuf.empty() && bad_conn_close) { if (oom_close) { @@ -426,7 +434,7 @@ void ControlConn::sendData() noexcept write(iob.fd, oomBuf, 1); } delete this; - return; + return true; } vector & pkt = outbuf.front(); @@ -436,15 +444,17 @@ void ControlConn::sendData() noexcept if (errno == EPIPE) { // read end closed delete this; + return true; } - else if (errno == EAGAIN || errno == EWOULDBLOCK) { + else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { // spurious readiness notification? } else { log(LogLevel::ERROR, "Error writing to control connection: ", strerror(errno)); delete this; + return true; } - return; + return false; } outpkt_index += written; @@ -458,9 +468,12 @@ void ControlConn::sendData() noexcept } else { delete this; + return true; } } } + + return false; } ControlConn::~ControlConn() noexcept diff --git a/src/control.h b/src/control.h index 943f7b9..260c10d 100644 --- a/src/control.h +++ b/src/control.h @@ -25,7 +25,7 @@ class ControlConn; class ControlConnWatcher; // forward-declaration of callback: -static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); +static dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); // Pointer to the control connection that is listening for rollback completion extern ControlConn * rollback_handler_conn; @@ -64,33 +64,29 @@ class ControlConnWatcher : public PosixBidiFdWatcher public: int fd; // TODO this is already stored, find a better way to access it. EventLoop_t * eventLoop; - int watch_flags; - void setWatchFlags(int flags) noexcept + void setWatchFlags(int flags) { PosixBidiFdWatcher::setWatchFlags(eventLoop, flags); - watch_flags = flags; } void registerWith(EventLoop_t *loop, int fd, int flags) { this->fd = fd; + this->eventLoop = loop; PosixBidiFdWatcher::registerWith(loop, fd, flags); - watch_flags = flags; } }; inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept { - PosixBidiFdWatcher::setWatchFlags(loop, watch_flags); - control_conn_cb(loop, this, flags); - return Rearm::NOOP; + return control_conn_cb(loop, this, flags); } class ControlConn : private ServiceListener { - friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); + friend dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); ControlConnWatcher iob; EventLoop_t *loop; @@ -121,32 +117,36 @@ class ControlConn : private ServiceListener unsigned outpkt_index = 0; // Queue a packet to be sent - // Returns: true if the packet was successfully queued, false if otherwise - // (eg if out of memory); in the latter case the connection might - // no longer be valid (iff there are no outgoing packets queued). + // Returns: false if the packet could not be queued and the connection has been + // destroyed; + // true (with bad_conn_close == false) if the packet was successfully + // queued; + // true (with bad_conn_close == true) if the packet was not successfully + // queued. + // The in/out watch enabled state will also be set appropriately. bool queuePacket(vector &&v) noexcept; bool queuePacket(const char *pkt, unsigned size) noexcept; // Process a packet. Can cause the ControlConn to be deleted iff there are no - // outgoing packets queued. + // outgoing packets queued (returns false). // Throws: // std::bad_alloc - if an out-of-memory condition prevents processing - void processPacket(); + bool processPacket(); // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc. - void processStartStop(int pktType); + bool processStartStop(int pktType); // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc. - void processFindLoad(int pktType); + bool processFindLoad(int pktType); // Process an UNPINSERVICE packet. May throw std::bad_alloc. - void processUnpinService(); + bool processUnpinService(); - // Notify that data is ready to be read from the socket. Returns true in cases where the - // connection was deleted with potentially pending outgoing packets. + // Notify that data is ready to be read from the socket. Returns true if the connection was + // deleted. bool dataReady() noexcept; - void sendData() noexcept; + bool sendData() noexcept; // Allocate a new handle for a service; may throw std::bad_alloc handle_t allocateServiceHandle(ServiceRecord *record); @@ -211,19 +211,25 @@ class ControlConn : private ServiceListener }; -static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) +static dasync::Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) { + using Rearm = dasync::Rearm; + char * cc_addr = (reinterpret_cast(watcher)) - offsetof(ControlConn, iob); ControlConn *conn = reinterpret_cast(cc_addr); if (revents & in_events) { if (conn->dataReady()) { // ControlConn was deleted - return; + return Rearm::REMOVED; } } if (revents & out_events) { - conn->sendData(); - } + if (conn->sendData()) { + return Rearm::REMOVED; + } + } + + return Rearm::NOOP; } #endif -- 2.25.1