Add function to queue outgoing packet to a control connection.
authorDavin McCall <davmac@davmac.org>
Sat, 21 Nov 2015 11:29:35 +0000 (11:29 +0000)
committerDavin McCall <davmac@davmac.org>
Sat, 21 Nov 2015 11:29:35 +0000 (11:29 +0000)
control.cc
control.h

index eefca49d2a0c65a0ebebdb97a8ed9133e98b83d4..e34f00837b346ad03aa55245670961eeea5d10f5 100644 (file)
@@ -1,6 +1,11 @@
 #include "control.h"
 #include "service.h"
 
+// TODO at the moment we rely on the readiness notification to send "bad packet" responses.
+//   It would probably be better to, if the outgoing buffer is empty, try and send the packet
+//   immediately and only queue it if necessary. However this means we would potentially
+//   delete 'this' object which needs to be accounted for in calling methods. (Might be better
+//   to return a bool indicating that delete is required).
 void ControlConn::processPacket()
 {
     using std::string;
@@ -15,15 +20,17 @@ void ControlConn::processPacket()
         uint16_t svcSize;
         memcpy(&svcSize, iobuf + 1, 2);
         if (svcSize <= 0) {
-            // TODO error response
-            bufidx = 1024; // dataReady will delete - TODO clean up
+            // TODO queue error response
+            bad_conn_close = true;
+            ev_io_set(&iob, iob.fd, EV_WRITE);
         }
         
         chklen = svcSize + 3;
         if (chklen > 1024) {
             // We can't have a service name this long
             // TODO error response
-            bufidx = 1024; // TODO cleanup.
+            bad_conn_close = true;
+            ev_io_set(&iob, iob.fd, EV_WRITE);
         }
         
         if (bufidx < chklen) {
@@ -72,16 +79,42 @@ void ControlConn::processPacket()
     }
 }
 
+bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
+{
+    bool was_empty = outbuf.empty();
+    try {
+        outbuf.emplace_back(pkt);
+        if (was_empty) {
+            ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
+        }
+        return true;
+    }
+    catch (std::bad_alloc &baexc) {
+        // Mark the connection bad, and stop reading further requests
+        bad_conn_close = true;
+        oom_close = true;
+        if (was_empty) {
+            // TODO send out-of-memory response
+            delete this;
+        }
+        else {
+            ev_io_set(&iob, iob.fd, EV_WRITE);
+        }
+        return false;
+    }
+}
+
 void ControlConn::rollbackComplete() noexcept
 {
     char ackBuf[1] = { DINIT_RP_COMPLETED };
+    // TODO Queue response instead of trying to write it directly like this
     if (write(iob.fd, ackBuf, 1) == -1) {
-        // TODO queue or at least re-try if error or 0 bytes written.
         log(LogLevel::ERROR, "Couldn't write response to control socket");
+        delete this;
     }
 }
 
-void ControlConn::dataReady()
+void ControlConn::dataReady() noexcept
 {
     int fd = iob.fd;
     int buffree = 1024 - bufidx;
@@ -108,14 +141,67 @@ void ControlConn::dataReady()
     
     // complete packet?
     if (bufidx >= chklen) {
-        processPacket();
+        try {
+            processPacket();
+        }
+        catch (std::bad_alloc &baexc) {
+            // TODO
+        }
     }
     
     if (bufidx == 1024) {
         // Too big packet
         // TODO log error?
         // TODO error response?
+        bad_conn_close = true;
+        ev_io_set(&iob, iob.fd, EV_WRITE);
+    }
+}
+
+void ControlConn::sendData() noexcept
+{
+    if (outbuf.empty() && bad_conn_close) {
+        if (oom_close) {
+            // TODO send oom response
+        }
         delete this;
+        return;
+    }
+    
+    vector<char> & pkt = outbuf.front();
+    char *data = pkt.data();
+    int written = write(iob.fd, data + outpkt_index, pkt.size() - outpkt_index);
+    if (written == -1) {
+        if (errno == EPIPE) {
+            // read end closed
+            delete this;
+        }
+        else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+            // spurious readiness notification?
+        }
+        else {
+            // TODO log error
+            delete this;
+        }
+        return;
+    }
+
+    outpkt_index += written;
+    if (outpkt_index == pkt.size()) {
+        // We've finished this packet, move on to the next:
+        outbuf.pop_front();
+        outpkt_index = 0;
+        if (outbuf.empty()) {
+            if (! bad_conn_close) {
+                ev_io_set(&iob, iob.fd, EV_READ);
+            }
+            else {
+                if (oom_close) {
+                    // TODO send out-of-memory reply if possible
+                }
+                delete this;
+            }
+        }
     }
 }
 
index 90c5f0db7fefd5943fc4d7fa1bf39c13b9a2f796..b5719a850e61320e11a38afd77d12d8e16cd515e 100644 (file)
--- a/control.h
+++ b/control.h
@@ -1,6 +1,9 @@
 #ifndef DINIT_CONTROL_H
 #define DINIT_CONTROL_H
 
+#include <list>
+#include <vector>
+
 #include <unistd.h>
 #include <ev++.h>
 #include "dinit-log.h"
@@ -37,9 +40,25 @@ class ControlConn
     char * iobuf;
     int bufidx;
     
+    bool bad_conn_close; // close when finished output?
+    bool oom_close;      // send final 'out of memory' indicator
+    
+    template <typename T> using list = std::list<T>;
+    template <typename T> using vector = std::vector<T>;
+    
+    // Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
+    list<vector<char>> outbuf;
+    unsigned outpkt_index = 0;
+    
     // The packet length before we need to re-check if the packet is complete
     int chklen;
     
+    // Queue a packet to be sent
+    //  Returns:  true if the packet was successfully queued, false if otherwise
+    //            (eg if out of memory); in the latter case the connection might
+    //            no longer be valid
+    bool queuePacket(vector<char> &&v) noexcept;
+    
     public:
     ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), bufidx(0), chklen(0)
     {
@@ -54,7 +73,9 @@ class ControlConn
     
     void processPacket();
     void rollbackComplete() noexcept;
-    void dataReady();
+    void dataReady() noexcept;
+    void sendData() noexcept;
+    
     
     ~ControlConn() noexcept;
 };
@@ -63,7 +84,13 @@ class ControlConn
 static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents)
 {
     ControlConn *conn = (ControlConn *) w->data;
-    conn->dataReady();
+    if (revents & EV_READ) {
+        conn->dataReady();
+    }
+    // TODO issu ehere: what if above deletes the connection?
+    if (revents & EV_WRITE) {
+        conn->sendData();
+    }    
 }
 
 #endif