2efe73128e56c2d65a3ad4100f55950b68978718
[oweals/dinit.git] / src / control.cc
1 #include "control.h"
2 #include "service.h"
3
4 void ControlConn::processPacket()
5 {
6     using std::string;
7     
8     // Note that where we call queuePacket, we must generally check the return value. If it
9     // returns false it has either deleted the connection or marked it for deletion; we
10     // shouldn't touch instance members after that point.
11
12     int pktType = rbuf[0];
13     if (pktType == DINIT_CP_QUERYVERSION) {
14         // Responds with:
15         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
16         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
17         if (! queuePacket(replyBuf, 1)) return;
18         rbuf.consume(1);
19         return;
20     }
21     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
22         processFindLoad(pktType);
23         return;
24     }
25     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
26             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
27         processStartStop(pktType);
28         return;
29     }
30     else if (pktType == DINIT_CP_SHUTDOWN) {
31         // Shutdown/reboot
32         if (rbuf.get_length() < 2) {
33             chklen = 2;
34             return;
35         }
36         
37         auto sd_type = static_cast<ShutdownType>(rbuf[1]);
38         
39         service_set->stop_all_services(sd_type);
40         log_to_console = true;
41         char ackBuf[] = { DINIT_RP_ACK };
42         if (! queuePacket(ackBuf, 1)) return;
43         
44         // Clear the packet from the buffer
45         rbuf.consume(2);
46         chklen = 0;
47         return;
48     }
49     else {
50         // Unrecognized: give error response
51         char outbuf[] = { DINIT_RP_BADREQ };
52         if (! queuePacket(outbuf, 1)) return;
53         bad_conn_close = true;
54         ev_io_set(&iob, iob.fd, EV_WRITE);
55     }
56     return;
57 }
58
59 void ControlConn::processFindLoad(int pktType)
60 {
61     using std::string;
62     
63     constexpr int pkt_size = 4;
64     
65     if (rbuf.get_length() < pkt_size) {
66         chklen = pkt_size;
67         return;
68     }
69     
70     uint16_t svcSize;
71     rbuf.extract((char *)&svcSize, 1, 2);
72     chklen = svcSize + 3; // packet type + (2 byte) length + service name
73     if (svcSize <= 0 || chklen > 1024) {
74         // Queue error response / mark connection bad
75         char badreqRep[] = { DINIT_RP_BADREQ };
76         if (! queuePacket(badreqRep, 1)) return;
77         bad_conn_close = true;
78         ev_io_set(&iob, iob.fd, EV_WRITE);
79         return;
80     }
81     
82     if (rbuf.get_length() < chklen) {
83         // packet not complete yet; read more
84         return;
85     }
86     
87     ServiceRecord * record = nullptr;
88     
89     string serviceName = std::move(rbuf.extract_string(3, svcSize));
90     
91     if (pktType == DINIT_CP_LOADSERVICE) {
92         // LOADSERVICE
93         try {
94             record = service_set->loadService(serviceName);
95         }
96         catch (ServiceLoadExc &slexc) {
97             log(LogLevel::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
98         }
99     }
100     else {
101         // FINDSERVICE
102         record = service_set->findService(serviceName.c_str());
103     }
104     
105     if (record != nullptr) {
106         // Allocate a service handle
107         handle_t handle = allocateServiceHandle(record);
108         std::vector<char> rp_buf;
109         rp_buf.reserve(7);
110         rp_buf.push_back(DINIT_RP_SERVICERECORD);
111         rp_buf.push_back(static_cast<char>(record->getState()));
112         for (int i = 0; i < (int) sizeof(handle); i++) {
113             rp_buf.push_back(*(((char *) &handle) + i));
114         }
115         rp_buf.push_back(static_cast<char>(record->getTargetState()));
116         if (! queuePacket(std::move(rp_buf))) return;
117     }
118     else {
119         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
120         if (! queuePacket(std::move(rp_buf))) return;
121     }
122     
123     // Clear the packet from the buffer
124     rbuf.consume(chklen);
125     chklen = 0;
126     return;
127 }
128
129 void ControlConn::processStartStop(int pktType)
130 {
131     using std::string;
132     
133     constexpr int pkt_size = 2 + sizeof(handle_t);
134     
135     if (rbuf.get_length() < pkt_size) {
136         chklen = pkt_size;
137         return;
138     }
139     
140     // 1 byte: packet type
141     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
142     // 4 bytes: service handle
143     
144     bool do_pin = (rbuf[1] == 1);
145     handle_t handle;
146     rbuf.extract((char *) &handle, 2, sizeof(handle));
147     
148     ServiceRecord *service = findServiceForKey(handle);
149     if (service == nullptr) {
150         // Service handle is bad
151         char badreqRep[] = { DINIT_RP_BADREQ };
152         if (! queuePacket(badreqRep, 1)) return;
153         bad_conn_close = true;
154         ev_io_set(&iob, iob.fd, EV_WRITE);
155         return;
156     }
157     else {
158         bool already_there = false;
159         switch (pktType) {
160         case DINIT_CP_STARTSERVICE:
161             if (do_pin) service->pinStart();
162             service->start();
163             already_there = service->getState() == ServiceState::STARTED;
164             break;
165         case DINIT_CP_STOPSERVICE:
166             if (do_pin) service->pinStop();
167             service->stop();
168             already_there = service->getState() == ServiceState::STOPPED;
169             break;
170         case DINIT_CP_WAKESERVICE:
171             // re-start a stopped service.
172             if (do_pin) service->pinStart();
173             service->start(false);
174             already_there = service->getState() == ServiceState::STARTED;
175             break;
176         default: /* DINIT_CP_RELEASESERVICE */
177             // remove explicit start from a service, without necessarily stopping it.
178             // TODO.
179             break;
180         }
181         
182         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
183         
184         if (! queuePacket(ack_buf, 1)) return;
185     }
186     
187     // Clear the packet from the buffer
188     rbuf.consume(pkt_size);
189     chklen = 0;
190     return;
191 }
192
193 ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
194 {
195     bool is_unique = true;
196     handle_t largest_seen = 0;
197     handle_t candidate = 0;
198     for (auto p : keyServiceMap) {
199         if (p.first > largest_seen) largest_seen = p.first;
200         if (p.first == candidate) {
201             if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
202             candidate = largest_seen + 1;
203         }
204         is_unique &= (p.second != record);
205     }
206     
207     keyServiceMap[candidate] = record;
208     serviceKeyMap.insert(std::make_pair(record, candidate));
209     
210     if (is_unique) {
211         record->addListener(this);
212     }
213     
214     return candidate;
215 }
216
217
218 bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
219 {
220     if (bad_conn_close) return false;
221
222     bool was_empty = outbuf.empty();
223
224     if (was_empty) {
225         int wr = write(iob.fd, pkt, size);
226         if (wr == -1) {
227             if (errno == EPIPE) {
228                 delete this;
229                 return false;
230             }
231             if (errno != EAGAIN && errno != EWOULDBLOCK) {
232                 // TODO log error
233                 delete this;
234                 return false;
235             }
236         }
237         else {
238             if ((unsigned)wr == size) {
239                 // Ok, all written.
240                 return true;
241             }
242             pkt += wr;
243             size -= wr;
244         }
245         ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
246     }
247     
248     // Create a vector out of the (remaining part of the) packet:
249     try {
250         outbuf.emplace_back(pkt, pkt + size);
251         return true;
252     }
253     catch (std::bad_alloc &baexc) {
254         // Mark the connection bad, and stop reading further requests
255         bad_conn_close = true;
256         oom_close = true;
257         if (was_empty) {
258             // We can't send out-of-memory response as we already wrote as much as we
259             // could above. Neither can we later send the response since we have currently
260             // sent an incomplete packet. All we can do is close the connection.
261             delete this;
262         }
263         else {
264             ev_io_set(&iob, iob.fd, EV_WRITE);
265         }
266         return false;    
267     }
268 }
269
270
271 bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
272 {
273     if (bad_conn_close) return false;
274
275     bool was_empty = outbuf.empty();
276     
277     if (was_empty) {
278         outpkt_index = 0;
279         // We can try sending the packet immediately:
280         int wr = write(iob.fd, pkt.data(), pkt.size());
281         if (wr == -1) {
282             if (errno == EPIPE) {
283                 delete this;
284                 return false;
285             }
286             if (errno != EAGAIN && errno != EWOULDBLOCK) {
287                 // TODO log error
288                 delete this;
289                 return false;
290             }
291         }
292         else {
293             if ((unsigned)wr == pkt.size()) {
294                 // Ok, all written.
295                 return true;
296             }
297             outpkt_index = wr;
298         }
299         ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
300     }
301     
302     try {
303         outbuf.emplace_back(pkt);
304         return true;
305     }
306     catch (std::bad_alloc &baexc) {
307         // Mark the connection bad, and stop reading further requests
308         bad_conn_close = true;
309         oom_close = true;
310         if (was_empty) {
311             // We can't send out-of-memory response as we already wrote as much as we
312             // could above. Neither can we later send the response since we have currently
313             // sent an incomplete packet. All we can do is close the connection.
314             delete this;
315         }
316         else {
317             ev_io_set(&iob, iob.fd, EV_WRITE);
318         }
319         return false;
320     }
321 }
322
323 bool ControlConn::rollbackComplete() noexcept
324 {
325     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
326     return queuePacket(ackBuf, 2);
327 }
328
329 bool ControlConn::dataReady() noexcept
330 {
331     int fd = iob.fd;
332     
333     int r = rbuf.fill(fd);
334     
335     // Note file descriptor is non-blocking
336     if (r == -1) {
337         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
338             // TODO log error
339             delete this;
340             return true;
341         }
342         return false;
343     }
344     
345     if (r == 0) {
346         delete this;
347         return true;
348     }
349     
350     // complete packet?
351     if (rbuf.get_length() >= chklen) {
352         try {
353             processPacket();
354         }
355         catch (std::bad_alloc &baexc) {
356             doOomClose();
357         }
358     }
359     
360     if (rbuf.get_length() == 1024) {
361         // Too big packet
362         // TODO log error?
363         // TODO error response?
364         bad_conn_close = true;
365         ev_io_set(&iob, iob.fd, EV_WRITE);
366     }
367     
368     return false;
369 }
370
371 void ControlConn::sendData() noexcept
372 {
373     if (outbuf.empty() && bad_conn_close) {
374         if (oom_close) {
375             // Send oom response
376             char oomBuf[] = { DINIT_RP_OOM };
377             write(iob.fd, oomBuf, 1);
378         }
379         delete this;
380         return;
381     }
382     
383     vector<char> & pkt = outbuf.front();
384     char *data = pkt.data();
385     int written = write(iob.fd, data + outpkt_index, pkt.size() - outpkt_index);
386     if (written == -1) {
387         if (errno == EPIPE) {
388             // read end closed
389             delete this;
390         }
391         else if (errno == EAGAIN || errno == EWOULDBLOCK) {
392             // spurious readiness notification?
393         }
394         else {
395             log(LogLevel::ERROR, "Error writing to control connection: ", strerror(errno));
396             delete this;
397         }
398         return;
399     }
400
401     outpkt_index += written;
402     if (outpkt_index == pkt.size()) {
403         // We've finished this packet, move on to the next:
404         outbuf.pop_front();
405         outpkt_index = 0;
406         if (outbuf.empty() && ! oom_close) {
407             if (! bad_conn_close) {
408                 ev_io_set(&iob, iob.fd, EV_READ);
409             }
410             else {
411                 delete this;
412             }
413         }
414     }
415 }
416
417 ControlConn::~ControlConn() noexcept
418 {
419     close(iob.fd);
420     ev_io_stop(loop, &iob);
421     
422     // Clear service listeners
423     for (auto p : serviceKeyMap) {
424         p.first->removeListener(this);
425     }
426     
427     active_control_conns--;
428 }