#include "control.h"
#include "service.h"
-void ControlConn::processPacket()
+bool ControlConn::processPacket()
{
using std::string;
// Responds with:
// DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
- if (! queuePacket(replyBuf, 1)) return;
+ if (! queuePacket(replyBuf, 1)) return false;
rbuf.consume(1);
- return;
+ return true;
}
if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
- processFindLoad(pktType);
- return;
+ return processFindLoad(pktType);
}
if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
|| pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
- processStartStop(pktType);
- return;
+ return processStartStop(pktType);
}
if (pktType == DINIT_CP_UNPINSERVICE) {
- processUnpinService();
- return;
+ return processUnpinService();
}
if (pktType == DINIT_CP_SHUTDOWN) {
// Shutdown/reboot
if (rbuf.get_length() < 2) {
chklen = 2;
- return;
+ return true;
}
auto sd_type = static_cast<ShutdownType>(rbuf[1]);
service_set->stop_all_services(sd_type);
char ackBuf[] = { DINIT_RP_ACK };
- if (! queuePacket(ackBuf, 1)) return;
+ if (! queuePacket(ackBuf, 1)) return false;
// Clear the packet from the buffer
rbuf.consume(2);
chklen = 0;
- return;
+ return true;
}
else {
// Unrecognized: give error response
char outbuf[] = { DINIT_RP_BADREQ };
- if (! queuePacket(outbuf, 1)) return;
+ if (! queuePacket(outbuf, 1)) return false;
bad_conn_close = true;
iob.setWatchFlags(out_events);
}
- return;
+ return true;
}
-void ControlConn::processFindLoad(int pktType)
+bool ControlConn::processFindLoad(int pktType)
{
using std::string;
if (rbuf.get_length() < pkt_size) {
chklen = pkt_size;
- return;
+ return true;
}
uint16_t svcSize;
if (svcSize <= 0 || chklen > 1024) {
// Queue error response / mark connection bad
char badreqRep[] = { DINIT_RP_BADREQ };
- if (! queuePacket(badreqRep, 1)) return;
+ if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
iob.setWatchFlags(out_events);
- return;
+ return true;
}
if (rbuf.get_length() < chklen) {
// packet not complete yet; read more
- return;
+ return true;
}
ServiceRecord * record = nullptr;
rp_buf.push_back(*(((char *) &handle) + i));
}
rp_buf.push_back(static_cast<char>(record->getTargetState()));
- if (! queuePacket(std::move(rp_buf))) return;
+ if (! queuePacket(std::move(rp_buf))) return false;
}
else {
std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
- if (! queuePacket(std::move(rp_buf))) return;
+ if (! queuePacket(std::move(rp_buf))) return false;
}
// Clear the packet from the buffer
rbuf.consume(chklen);
chklen = 0;
- return;
+ return true;
}
-void ControlConn::processStartStop(int pktType)
+bool ControlConn::processStartStop(int pktType)
{
using std::string;
if (rbuf.get_length() < pkt_size) {
chklen = pkt_size;
- return;
+ return true;
}
// 1 byte: packet type
if (service == nullptr) {
// Service handle is bad
char badreqRep[] = { DINIT_RP_BADREQ };
- if (! queuePacket(badreqRep, 1)) return;
+ if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
iob.setWatchFlags(out_events);
- return;
+ return true;
}
else {
bool already_there = false;
char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
- if (! queuePacket(ack_buf, 1)) return;
+ if (! queuePacket(ack_buf, 1)) return false;
}
// Clear the packet from the buffer
rbuf.consume(pkt_size);
chklen = 0;
- return;
+ return true;
}
-void ControlConn::processUnpinService()
+bool ControlConn::processUnpinService()
{
using std::string;
if (rbuf.get_length() < pkt_size) {
chklen = pkt_size;
- return;
+ return true;
}
// 1 byte: packet type
if (service == nullptr) {
// Service handle is bad
char badreqRep[] = { DINIT_RP_BADREQ };
- if (! queuePacket(badreqRep, 1)) return;
+ if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
iob.setWatchFlags(out_events);
- return;
+ return true;
}
else {
service->unpin();
char ack_buf[] = { (char) DINIT_RP_ACK };
- if (! queuePacket(ack_buf, 1)) return;
+ if (! queuePacket(ack_buf, 1)) return false;
}
// Clear the packet from the buffer
rbuf.consume(pkt_size);
chklen = 0;
- return;
+ return true;
}
ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
{
- if (bad_conn_close) return false;
-
+ int in_flag = bad_conn_close ? 0 : in_events;
bool was_empty = outbuf.empty();
+ // If the queue is empty, we can try to write the packet out now rather than queueing it.
+ // If the write is unsuccessful or partial, we queue the remainder.
if (was_empty) {
int wr = write(iob.fd, pkt, size);
if (wr == -1) {
delete this;
return false;
}
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
// TODO log error
delete this;
return false;
}
+ // EAGAIN etc: fall through to below
}
else {
if ((unsigned)wr == size) {
// Ok, all written.
+ iob.setWatchFlags(in_flag);
return true;
}
pkt += wr;
size -= wr;
}
- iob.setWatchFlags(in_events | out_events);
}
// Create a vector out of the (remaining part of the) packet:
try {
outbuf.emplace_back(pkt, pkt + size);
+ iob.setWatchFlags(in_flag | out_events);
return true;
}
catch (std::bad_alloc &baexc) {
// could above. Neither can we later send the response since we have currently
// sent an incomplete packet. All we can do is close the connection.
delete this;
+ return false;
}
else {
iob.setWatchFlags(out_events);
+ return true;
}
- return false;
}
}
-
+// This queuePacket method is frustratingly similar to the one above, but the subtle differences
+// make them extraordinary difficult to combine into a single method.
bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
{
- if (bad_conn_close) return false;
-
+ int in_flag = bad_conn_close ? 0 : in_events;
bool was_empty = outbuf.empty();
if (was_empty) {
delete this;
return false;
}
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
// TODO log error
delete this;
return false;
}
+ // EAGAIN etc: fall through to below
}
else {
if ((unsigned)wr == pkt.size()) {
// Ok, all written.
+ iob.setWatchFlags(in_flag);
return true;
}
outpkt_index = wr;
}
- iob.setWatchFlags(in_events | out_events);
}
try {
outbuf.emplace_back(pkt);
+ iob.setWatchFlags(in_flag | out_events);
return true;
}
catch (std::bad_alloc &baexc) {
// could above. Neither can we later send the response since we have currently
// sent an incomplete packet. All we can do is close the connection.
delete this;
+ return false;
}
else {
iob.setWatchFlags(out_events);
+ return true;
}
- return false;
}
}
// complete packet?
if (rbuf.get_length() >= chklen) {
try {
- processPacket();
+ return processPacket();
}
catch (std::bad_alloc &baexc) {
doOomClose();
+ return false;
}
}
-
- if (rbuf.get_length() == 1024) {
+ else if (rbuf.get_length() == 1024) {
// Too big packet
// TODO log error?
// TODO error response?
bad_conn_close = true;
iob.setWatchFlags(out_events);
}
+ else {
+ int out_flags = (bad_conn_close || !outbuf.empty()) ? out_events : 0;
+ iob.setWatchFlags(in_events | out_flags);
+ }
return false;
}
-void ControlConn::sendData() noexcept
+bool ControlConn::sendData() noexcept
{
if (outbuf.empty() && bad_conn_close) {
if (oom_close) {
write(iob.fd, oomBuf, 1);
}
delete this;
- return;
+ return true;
}
vector<char> & pkt = outbuf.front();
if (errno == EPIPE) {
// read end closed
delete this;
+ return true;
}
- else if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
// spurious readiness notification?
}
else {
log(LogLevel::ERROR, "Error writing to control connection: ", strerror(errno));
delete this;
+ return true;
}
- return;
+ return false;
}
outpkt_index += written;
}
else {
delete this;
+ return true;
}
}
}
+
+ return false;
}
ControlConn::~ControlConn() noexcept
class ControlConnWatcher;
// forward-declaration of callback:
-static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+static dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
// Pointer to the control connection that is listening for rollback completion
extern ControlConn * rollback_handler_conn;
public:
int fd; // TODO this is already stored, find a better way to access it.
EventLoop_t * eventLoop;
- int watch_flags;
- void setWatchFlags(int flags) noexcept
+ void setWatchFlags(int flags)
{
PosixBidiFdWatcher<NullMutex>::setWatchFlags(eventLoop, flags);
- watch_flags = flags;
}
void registerWith(EventLoop_t *loop, int fd, int flags)
{
this->fd = fd;
+ this->eventLoop = loop;
PosixBidiFdWatcher<NullMutex>::registerWith(loop, fd, flags);
- watch_flags = flags;
}
};
inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept
{
- PosixBidiFdWatcher<NullMutex>::setWatchFlags(loop, watch_flags);
- control_conn_cb(loop, this, flags);
- return Rearm::NOOP;
+ return control_conn_cb(loop, this, flags);
}
class ControlConn : private ServiceListener
{
- friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+ friend dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
ControlConnWatcher iob;
EventLoop_t *loop;
unsigned outpkt_index = 0;
// Queue a packet to be sent
- // Returns: true if the packet was successfully queued, false if otherwise
- // (eg if out of memory); in the latter case the connection might
- // no longer be valid (iff there are no outgoing packets queued).
+ // Returns: false if the packet could not be queued and the connection has been
+ // destroyed;
+ // true (with bad_conn_close == false) if the packet was successfully
+ // queued;
+ // true (with bad_conn_close == true) if the packet was not successfully
+ // queued.
+ // The in/out watch enabled state will also be set appropriately.
bool queuePacket(vector<char> &&v) noexcept;
bool queuePacket(const char *pkt, unsigned size) noexcept;
// Process a packet. Can cause the ControlConn to be deleted iff there are no
- // outgoing packets queued.
+ // outgoing packets queued (returns false).
// Throws:
// std::bad_alloc - if an out-of-memory condition prevents processing
- void processPacket();
+ bool processPacket();
// Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
- void processStartStop(int pktType);
+ bool processStartStop(int pktType);
// Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
- void processFindLoad(int pktType);
+ bool processFindLoad(int pktType);
// Process an UNPINSERVICE packet. May throw std::bad_alloc.
- void processUnpinService();
+ bool processUnpinService();
- // Notify that data is ready to be read from the socket. Returns true in cases where the
- // connection was deleted with potentially pending outgoing packets.
+ // Notify that data is ready to be read from the socket. Returns true if the connection was
+ // deleted.
bool dataReady() noexcept;
- void sendData() noexcept;
+ bool sendData() noexcept;
// Allocate a new handle for a service; may throw std::bad_alloc
handle_t allocateServiceHandle(ServiceRecord *record);
};
-static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
+static dasync::Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
{
+ using Rearm = dasync::Rearm;
+
char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
if (revents & in_events) {
if (conn->dataReady()) {
// ControlConn was deleted
- return;
+ return Rearm::REMOVED;
}
}
if (revents & out_events) {
- conn->sendData();
- }
+ if (conn->sendData()) {
+ return Rearm::REMOVED;
+ }
+ }
+
+ return Rearm::NOOP;
}
#endif