Rework control protocol handling a little.
authorDavin McCall <davmac@davmac.org>
Mon, 13 Jun 2016 21:21:08 +0000 (22:21 +0100)
committerDavin McCall <davmac@davmac.org>
Mon, 13 Jun 2016 21:21:08 +0000 (22:21 +0100)
src/control.cc
src/control.h

index acd2aa381bd22e6450f30d3788dc10e7f8f30322..5c44c11a873f1e85f194dcf6cf94481a5f8c03e0 100644 (file)
@@ -1,7 +1,7 @@
 #include "control.h"
 #include "service.h"
 
-void ControlConn::processPacket()
+bool ControlConn::processPacket()
 {
     using std::string;
     
@@ -14,52 +14,49 @@ void ControlConn::processPacket()
         // Responds with:
         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
-        if (! queuePacket(replyBuf, 1)) return;
+        if (! queuePacket(replyBuf, 1)) return false;
         rbuf.consume(1);
-        return;
+        return true;
     }
     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
-        processFindLoad(pktType);
-        return;
+        return processFindLoad(pktType);
     }
     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
-        processStartStop(pktType);
-        return;
+        return processStartStop(pktType);
     }
     if (pktType == DINIT_CP_UNPINSERVICE) {
-        processUnpinService();
-        return;
+        return processUnpinService();
     }
     if (pktType == DINIT_CP_SHUTDOWN) {
         // Shutdown/reboot
         if (rbuf.get_length() < 2) {
             chklen = 2;
-            return;
+            return true;
         }
         
         auto sd_type = static_cast<ShutdownType>(rbuf[1]);
         
         service_set->stop_all_services(sd_type);
         char ackBuf[] = { DINIT_RP_ACK };
-        if (! queuePacket(ackBuf, 1)) return;
+        if (! queuePacket(ackBuf, 1)) return false;
         
         // Clear the packet from the buffer
         rbuf.consume(2);
         chklen = 0;
-        return;
+        return true;
     }
     else {
         // Unrecognized: give error response
         char outbuf[] = { DINIT_RP_BADREQ };
-        if (! queuePacket(outbuf, 1)) return;
+        if (! queuePacket(outbuf, 1)) return false;
         bad_conn_close = true;
         iob.setWatchFlags(out_events);
     }
-    return;
+    return true;
 }
 
-void ControlConn::processFindLoad(int pktType)
+bool ControlConn::processFindLoad(int pktType)
 {
     using std::string;
     
@@ -67,7 +64,7 @@ void ControlConn::processFindLoad(int pktType)
     
     if (rbuf.get_length() < pkt_size) {
         chklen = pkt_size;
-        return;
+        return true;
     }
     
     uint16_t svcSize;
@@ -76,15 +73,15 @@ void ControlConn::processFindLoad(int pktType)
     if (svcSize <= 0 || chklen > 1024) {
         // Queue error response / mark connection bad
         char badreqRep[] = { DINIT_RP_BADREQ };
-        if (! queuePacket(badreqRep, 1)) return;
+        if (! queuePacket(badreqRep, 1)) return false;
         bad_conn_close = true;
         iob.setWatchFlags(out_events);
-        return;
+        return true;
     }
     
     if (rbuf.get_length() < chklen) {
         // packet not complete yet; read more
-        return;
+        return true;
     }
     
     ServiceRecord * record = nullptr;
@@ -116,20 +113,20 @@ void ControlConn::processFindLoad(int pktType)
             rp_buf.push_back(*(((char *) &handle) + i));
         }
         rp_buf.push_back(static_cast<char>(record->getTargetState()));
-        if (! queuePacket(std::move(rp_buf))) return;
+        if (! queuePacket(std::move(rp_buf))) return false;
     }
     else {
         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
-        if (! queuePacket(std::move(rp_buf))) return;
+        if (! queuePacket(std::move(rp_buf))) return false;
     }
     
     // Clear the packet from the buffer
     rbuf.consume(chklen);
     chklen = 0;
-    return;
+    return true;
 }
 
-void ControlConn::processStartStop(int pktType)
+bool ControlConn::processStartStop(int pktType)
 {
     using std::string;
     
@@ -137,7 +134,7 @@ void ControlConn::processStartStop(int pktType)
     
     if (rbuf.get_length() < pkt_size) {
         chklen = pkt_size;
-        return;
+        return true;
     }
     
     // 1 byte: packet type
@@ -152,10 +149,10 @@ void ControlConn::processStartStop(int pktType)
     if (service == nullptr) {
         // Service handle is bad
         char badreqRep[] = { DINIT_RP_BADREQ };
-        if (! queuePacket(badreqRep, 1)) return;
+        if (! queuePacket(badreqRep, 1)) return false;
         bad_conn_close = true;
         iob.setWatchFlags(out_events);
-        return;
+        return true;
     }
     else {
         bool already_there = false;
@@ -192,16 +189,16 @@ void ControlConn::processStartStop(int pktType)
         
         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
         
-        if (! queuePacket(ack_buf, 1)) return;
+        if (! queuePacket(ack_buf, 1)) return false;
     }
     
     // Clear the packet from the buffer
     rbuf.consume(pkt_size);
     chklen = 0;
-    return;
+    return true;
 }
 
-void ControlConn::processUnpinService()
+bool ControlConn::processUnpinService()
 {
     using std::string;
     
@@ -209,7 +206,7 @@ void ControlConn::processUnpinService()
     
     if (rbuf.get_length() < pkt_size) {
         chklen = pkt_size;
-        return;
+        return true;
     }
     
     // 1 byte: packet type
@@ -222,21 +219,21 @@ void ControlConn::processUnpinService()
     if (service == nullptr) {
         // Service handle is bad
         char badreqRep[] = { DINIT_RP_BADREQ };
-        if (! queuePacket(badreqRep, 1)) return;
+        if (! queuePacket(badreqRep, 1)) return false;
         bad_conn_close = true;
         iob.setWatchFlags(out_events);
-        return;
+        return true;
     }
     else {
         service->unpin();
         char ack_buf[] = { (char) DINIT_RP_ACK };
-        if (! queuePacket(ack_buf, 1)) return;
+        if (! queuePacket(ack_buf, 1)) return false;
     }
     
     // Clear the packet from the buffer
     rbuf.consume(pkt_size);
     chklen = 0;
-    return;
+    return true;
 }
 
 ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
@@ -266,10 +263,11 @@ ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
 
 bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
 {
-    if (bad_conn_close) return false;
-
+    int in_flag = bad_conn_close ? 0 : in_events;
     bool was_empty = outbuf.empty();
 
+    // If the queue is empty, we can try to write the packet out now rather than queueing it.
+    // If the write is unsuccessful or partial, we queue the remainder.
     if (was_empty) {
         int wr = write(iob.fd, pkt, size);
         if (wr == -1) {
@@ -277,26 +275,28 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
                 delete this;
                 return false;
             }
-            if (errno != EAGAIN && errno != EWOULDBLOCK) {
+            if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
                 // TODO log error
                 delete this;
                 return false;
             }
+            // EAGAIN etc: fall through to below
         }
         else {
             if ((unsigned)wr == size) {
                 // Ok, all written.
+                iob.setWatchFlags(in_flag);
                 return true;
             }
             pkt += wr;
             size -= wr;
         }
-        iob.setWatchFlags(in_events | out_events);
     }
     
     // Create a vector out of the (remaining part of the) packet:
     try {
         outbuf.emplace_back(pkt, pkt + size);
+        iob.setWatchFlags(in_flag | out_events);
         return true;
     }
     catch (std::bad_alloc &baexc) {
@@ -308,19 +308,20 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
             // could above. Neither can we later send the response since we have currently
             // sent an incomplete packet. All we can do is close the connection.
             delete this;
+            return false;
         }
         else {
             iob.setWatchFlags(out_events);
+            return true;
         }
-        return false;    
     }
 }
 
-
+// This queuePacket method is frustratingly similar to the one above, but the subtle differences
+// make them extraordinary difficult to combine into a single method.
 bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
 {
-    if (bad_conn_close) return false;
-
+    int in_flag = bad_conn_close ? 0 : in_events;
     bool was_empty = outbuf.empty();
     
     if (was_empty) {
@@ -332,24 +333,26 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
                 delete this;
                 return false;
             }
-            if (errno != EAGAIN && errno != EWOULDBLOCK) {
+            if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
                 // TODO log error
                 delete this;
                 return false;
             }
+            // EAGAIN etc: fall through to below
         }
         else {
             if ((unsigned)wr == pkt.size()) {
                 // Ok, all written.
+                iob.setWatchFlags(in_flag);
                 return true;
             }
             outpkt_index = wr;
         }
-        iob.setWatchFlags(in_events | out_events);
     }
     
     try {
         outbuf.emplace_back(pkt);
+        iob.setWatchFlags(in_flag | out_events);
         return true;
     }
     catch (std::bad_alloc &baexc) {
@@ -361,11 +364,12 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
             // could above. Neither can we later send the response since we have currently
             // sent an incomplete packet. All we can do is close the connection.
             delete this;
+            return false;
         }
         else {
             iob.setWatchFlags(out_events);
+            return true;
         }
-        return false;
     }
 }
 
@@ -399,25 +403,29 @@ bool ControlConn::dataReady() noexcept
     // complete packet?
     if (rbuf.get_length() >= chklen) {
         try {
-            processPacket();
+            return processPacket();
         }
         catch (std::bad_alloc &baexc) {
             doOomClose();
+            return false;
         }
     }
-    
-    if (rbuf.get_length() == 1024) {
+    else if (rbuf.get_length() == 1024) {
         // Too big packet
         // TODO log error?
         // TODO error response?
         bad_conn_close = true;
         iob.setWatchFlags(out_events);
     }
+    else {
+        int out_flags = (bad_conn_close || !outbuf.empty()) ? out_events : 0;
+        iob.setWatchFlags(in_events | out_flags);
+    }
     
     return false;
 }
 
-void ControlConn::sendData() noexcept
+bool ControlConn::sendData() noexcept
 {
     if (outbuf.empty() && bad_conn_close) {
         if (oom_close) {
@@ -426,7 +434,7 @@ void ControlConn::sendData() noexcept
             write(iob.fd, oomBuf, 1);
         }
         delete this;
-        return;
+        return true;
     }
     
     vector<char> & pkt = outbuf.front();
@@ -436,15 +444,17 @@ void ControlConn::sendData() noexcept
         if (errno == EPIPE) {
             // read end closed
             delete this;
+            return true;
         }
-        else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
             // spurious readiness notification?
         }
         else {
             log(LogLevel::ERROR, "Error writing to control connection: ", strerror(errno));
             delete this;
+            return true;
         }
-        return;
+        return false;
     }
 
     outpkt_index += written;
@@ -458,9 +468,12 @@ void ControlConn::sendData() noexcept
             }
             else {
                 delete this;
+                return true;
             }
         }
     }
+    
+    return false;
 }
 
 ControlConn::~ControlConn() noexcept
index 943f7b98804f09abacf0673371f99505f438dc2d..260c10d107c51a979ec7bee27dd5e2d94d8dc66d 100644 (file)
@@ -25,7 +25,7 @@ class ControlConn;
 class ControlConnWatcher;
 
 // forward-declaration of callback:
-static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+static dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
 
 // Pointer to the control connection that is listening for rollback completion
 extern ControlConn * rollback_handler_conn;
@@ -64,33 +64,29 @@ class ControlConnWatcher : public PosixBidiFdWatcher<NullMutex>
     public:
     int fd; // TODO this is already stored, find a better way to access it.
     EventLoop_t * eventLoop;
-    int watch_flags;
     
-    void setWatchFlags(int flags) noexcept
+    void setWatchFlags(int flags)
     {
         PosixBidiFdWatcher<NullMutex>::setWatchFlags(eventLoop, flags);
-        watch_flags = flags;
     }
     
     void registerWith(EventLoop_t *loop, int fd, int flags)
     {
         this->fd = fd;
+        this->eventLoop = loop;
         PosixBidiFdWatcher<NullMutex>::registerWith(loop, fd, flags);
-        watch_flags = flags;
     }
 };
 
 inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept
 {
-    PosixBidiFdWatcher<NullMutex>::setWatchFlags(loop, watch_flags);
-    control_conn_cb(loop, this, flags);
-    return Rearm::NOOP;
+    return control_conn_cb(loop, this, flags);
 }
 
 
 class ControlConn : private ServiceListener
 {
-    friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+    friend dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
     
     ControlConnWatcher iob;
     EventLoop_t *loop;
@@ -121,32 +117,36 @@ class ControlConn : private ServiceListener
     unsigned outpkt_index = 0;
     
     // Queue a packet to be sent
-    //  Returns:  true if the packet was successfully queued, false if otherwise
-    //            (eg if out of memory); in the latter case the connection might
-    //            no longer be valid (iff there are no outgoing packets queued).
+    //  Returns:  false if the packet could not be queued and the connection has been
+    //              destroyed;
+    //            true (with bad_conn_close == false) if the packet was successfully
+    //              queued;
+    //            true (with bad_conn_close == true) if the packet was not successfully
+    //              queued.
+    // The in/out watch enabled state will also be set appropriately.
     bool queuePacket(vector<char> &&v) noexcept;
     bool queuePacket(const char *pkt, unsigned size) noexcept;
 
     // Process a packet. Can cause the ControlConn to be deleted iff there are no
-    // outgoing packets queued.
+    // outgoing packets queued (returns false).
     // Throws:
     //    std::bad_alloc - if an out-of-memory condition prevents processing
-    void processPacket();
+    bool processPacket();
     
     // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
-    void processStartStop(int pktType);
+    bool processStartStop(int pktType);
     
     // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
-    void processFindLoad(int pktType);
+    bool processFindLoad(int pktType);
 
     // Process an UNPINSERVICE packet. May throw std::bad_alloc.
-    void processUnpinService();
+    bool processUnpinService();
 
-    // Notify that data is ready to be read from the socket. Returns true in cases where the
-    // connection was deleted with potentially pending outgoing packets.
+    // Notify that data is ready to be read from the socket. Returns true if the connection was
+    // deleted.
     bool dataReady() noexcept;
     
-    void sendData() noexcept;
+    bool sendData() noexcept;
     
     // Allocate a new handle for a service; may throw std::bad_alloc
     handle_t allocateServiceHandle(ServiceRecord *record);
@@ -211,19 +211,25 @@ class ControlConn : private ServiceListener
 };
 
 
-static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
+static dasync::Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
 {
+    using Rearm = dasync::Rearm;
+
     char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
     ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
     if (revents & in_events) {
         if (conn->dataReady()) {
             // ControlConn was deleted
-            return;
+            return Rearm::REMOVED;
         }
     }
     if (revents & out_events) {
-        conn->sendData();
-    }    
+        if (conn->sendData()) {
+            return Rearm::REMOVED;
+        }
+    }
+    
+    return Rearm::NOOP;
 }
 
 #endif