a3eaae282046794c4736c6543d533d2a1492cad7
[oweals/dinit.git] / src / control.cc
1 #include "control.h"
2 #include "service.h"
3
4 bool control_conn_t::processPacket()
5 {
6     using std::string;
7     
8     // Note that where we call queuePacket, we must generally check the return value. If it
9     // returns false it has either deleted the connection or marked it for deletion; we
10     // shouldn't touch instance members after that point.
11
12     int pktType = rbuf[0];
13     if (pktType == DINIT_CP_QUERYVERSION) {
14         // Responds with:
15         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
16         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
17         if (! queuePacket(replyBuf, 1)) return false;
18         rbuf.consume(1);
19         return true;
20     }
21     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
22         return processFindLoad(pktType);
23     }
24     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
25             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
26         return processStartStop(pktType);
27     }
28     if (pktType == DINIT_CP_UNPINSERVICE) {
29         return processUnpinService();
30     }
31     if (pktType == DINIT_CP_SHUTDOWN) {
32         // Shutdown/reboot
33         if (rbuf.get_length() < 2) {
34             chklen = 2;
35             return true;
36         }
37         
38         auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
39         
40         services->stop_all_services(sd_type);
41         char ackBuf[] = { DINIT_RP_ACK };
42         if (! queuePacket(ackBuf, 1)) return false;
43         
44         // Clear the packet from the buffer
45         rbuf.consume(2);
46         chklen = 0;
47         return true;
48     }
49     if (pktType == DINIT_CP_LISTSERVICES) {
50         return listServices();
51     }
52     else {
53         // Unrecognized: give error response
54         char outbuf[] = { DINIT_RP_BADREQ };
55         if (! queuePacket(outbuf, 1)) return false;
56         bad_conn_close = true;
57         iob.set_watches(OUT_EVENTS);
58     }
59     return true;
60 }
61
62 bool control_conn_t::processFindLoad(int pktType)
63 {
64     using std::string;
65     
66     constexpr int pkt_size = 4;
67     
68     if (rbuf.get_length() < pkt_size) {
69         chklen = pkt_size;
70         return true;
71     }
72     
73     uint16_t svcSize;
74     rbuf.extract((char *)&svcSize, 1, 2);
75     chklen = svcSize + 3; // packet type + (2 byte) length + service name
76     if (svcSize <= 0 || chklen > 1024) {
77         // Queue error response / mark connection bad
78         char badreqRep[] = { DINIT_RP_BADREQ };
79         if (! queuePacket(badreqRep, 1)) return false;
80         bad_conn_close = true;
81         iob.set_watches(OUT_EVENTS);
82         return true;
83     }
84     
85     if (rbuf.get_length() < chklen) {
86         // packet not complete yet; read more
87         return true;
88     }
89     
90     service_record * record = nullptr;
91     
92     string serviceName = rbuf.extract_string(3, svcSize);
93     
94     if (pktType == DINIT_CP_LOADSERVICE) {
95         // LOADSERVICE
96         try {
97             record = services->load_service(serviceName.c_str());
98         }
99         catch (service_load_exc &slexc) {
100             log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
101         }
102     }
103     else {
104         // FINDSERVICE
105         record = services->find_service(serviceName.c_str());
106     }
107     
108     if (record != nullptr) {
109         // Allocate a service handle
110         handle_t handle = allocateServiceHandle(record);
111         std::vector<char> rp_buf;
112         rp_buf.reserve(7);
113         rp_buf.push_back(DINIT_RP_SERVICERECORD);
114         rp_buf.push_back(static_cast<char>(record->get_state()));
115         for (int i = 0; i < (int) sizeof(handle); i++) {
116             rp_buf.push_back(*(((char *) &handle) + i));
117         }
118         rp_buf.push_back(static_cast<char>(record->get_target_state()));
119         if (! queuePacket(std::move(rp_buf))) return false;
120     }
121     else {
122         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
123         if (! queuePacket(std::move(rp_buf))) return false;
124     }
125     
126     // Clear the packet from the buffer
127     rbuf.consume(chklen);
128     chklen = 0;
129     return true;
130 }
131
132 bool control_conn_t::processStartStop(int pktType)
133 {
134     using std::string;
135     
136     constexpr int pkt_size = 2 + sizeof(handle_t);
137     
138     if (rbuf.get_length() < pkt_size) {
139         chklen = pkt_size;
140         return true;
141     }
142     
143     // 1 byte: packet type
144     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
145     // 4 bytes: service handle
146     
147     bool do_pin = (rbuf[1] == 1);
148     handle_t handle;
149     rbuf.extract((char *) &handle, 2, sizeof(handle));
150     
151     service_record *service = findServiceForKey(handle);
152     if (service == nullptr) {
153         // Service handle is bad
154         char badreqRep[] = { DINIT_RP_BADREQ };
155         if (! queuePacket(badreqRep, 1)) return false;
156         bad_conn_close = true;
157         iob.set_watches(OUT_EVENTS);
158         return true;
159     }
160     else {
161         bool already_there = false;
162         
163         switch (pktType) {
164         case DINIT_CP_STARTSERVICE:
165             // start service, mark as required
166             if (do_pin) service->pin_start();
167             service->start();
168             services->process_queues();
169             already_there = service->get_state() == service_state_t::STARTED;
170             break;
171         case DINIT_CP_STOPSERVICE:
172             // force service to stop
173             if (do_pin) service->pin_stop();
174             service->stop(true);
175             service->forced_stop();
176             services->process_queues();
177             already_there = service->get_state() == service_state_t::STOPPED;
178             break;
179         case DINIT_CP_WAKESERVICE:
180             // re-start a stopped service (do not mark as required)
181             if (do_pin) service->pin_start();
182             service->start(false);
183             services->process_queues();
184             already_there = service->get_state() == service_state_t::STARTED;
185             break;
186         case DINIT_CP_RELEASESERVICE:
187             // remove required mark, stop if not required by dependents
188             if (do_pin) service->pin_stop();
189             service->stop(false);
190             services->process_queues();
191             already_there = service->get_state() == service_state_t::STOPPED;
192             break;
193         }
194         
195         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
196         
197         if (! queuePacket(ack_buf, 1)) return false;
198     }
199     
200     // Clear the packet from the buffer
201     rbuf.consume(pkt_size);
202     chklen = 0;
203     return true;
204 }
205
206 bool control_conn_t::processUnpinService()
207 {
208     using std::string;
209     
210     constexpr int pkt_size = 1 + sizeof(handle_t);
211     
212     if (rbuf.get_length() < pkt_size) {
213         chklen = pkt_size;
214         return true;
215     }
216     
217     // 1 byte: packet type
218     // 4 bytes: service handle
219     
220     handle_t handle;
221     rbuf.extract((char *) &handle, 1, sizeof(handle));
222     
223     service_record *service = findServiceForKey(handle);
224     if (service == nullptr) {
225         // Service handle is bad
226         char badreqRep[] = { DINIT_RP_BADREQ };
227         if (! queuePacket(badreqRep, 1)) return false;
228         bad_conn_close = true;
229         iob.set_watches(OUT_EVENTS);
230         return true;
231     }
232     else {
233         service->unpin();
234         services->process_queues();
235         char ack_buf[] = { (char) DINIT_RP_ACK };
236         if (! queuePacket(ack_buf, 1)) return false;
237     }
238     
239     // Clear the packet from the buffer
240     rbuf.consume(pkt_size);
241     chklen = 0;
242     return true;
243 }
244
245 bool control_conn_t::listServices()
246 {
247     rbuf.consume(1); // clear request packet
248     chklen = 0;
249     
250     try {
251         auto slist = services->listServices();
252         for (auto sptr : slist) {
253             std::vector<char> pkt_buf;
254             
255             const std::string &name = sptr->get_service_name();
256             int nameLen = std::min((size_t)256, name.length());
257             pkt_buf.resize(8 + nameLen);
258             
259             pkt_buf[0] = DINIT_RP_SVCINFO;
260             pkt_buf[1] = nameLen;
261             pkt_buf[2] = static_cast<char>(sptr->get_state());
262             pkt_buf[3] = static_cast<char>(sptr->get_target_state());
263             
264             pkt_buf[4] = 0; // reserved
265             pkt_buf[5] = 0;
266             pkt_buf[6] = 0;
267             pkt_buf[7] = 0;
268             
269             for (int i = 0; i < nameLen; i++) {
270                 pkt_buf[8+i] = name[i];
271             }
272             
273             if (! queuePacket(std::move(pkt_buf))) return false;
274         }
275         
276         char ack_buf[] = { (char) DINIT_RP_LISTDONE };
277         if (! queuePacket(ack_buf, 1)) return false;
278         
279         return true;
280     }
281     catch (std::bad_alloc &exc)
282     {
283         doOomClose();
284         return true;
285     }
286 }
287
288 control_conn_t::handle_t control_conn_t::allocateServiceHandle(service_record *record)
289 {
290     bool is_unique = true;
291     handle_t largest_seen = 0;
292     handle_t candidate = 0;
293     for (auto p : keyServiceMap) {
294         if (p.first > largest_seen) largest_seen = p.first;
295         if (p.first == candidate) {
296             if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
297             candidate = largest_seen + 1;
298         }
299         is_unique &= (p.second != record);
300     }
301     
302     keyServiceMap[candidate] = record;
303     serviceKeyMap.insert(std::make_pair(record, candidate));
304     
305     if (is_unique) {
306         record->addListener(this);
307     }
308     
309     return candidate;
310 }
311
312
313 bool control_conn_t::queuePacket(const char *pkt, unsigned size) noexcept
314 {
315     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
316     bool was_empty = outbuf.empty();
317
318     // If the queue is empty, we can try to write the packet out now rather than queueing it.
319     // If the write is unsuccessful or partial, we queue the remainder.
320     if (was_empty) {
321         int wr = write(iob.get_watched_fd(), pkt, size);
322         if (wr == -1) {
323             if (errno == EPIPE) {
324                 return false;
325             }
326             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
327                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
328                 return false;
329             }
330             // EAGAIN etc: fall through to below
331         }
332         else {
333             if ((unsigned)wr == size) {
334                 // Ok, all written.
335                 iob.set_watches(in_flag);
336                 return true;
337             }
338             pkt += wr;
339             size -= wr;
340         }
341     }
342     
343     // Create a vector out of the (remaining part of the) packet:
344     try {
345         outbuf.emplace_back(pkt, pkt + size);
346         iob.set_watches(in_flag | OUT_EVENTS);
347         return true;
348     }
349     catch (std::bad_alloc &baexc) {
350         // Mark the connection bad, and stop reading further requests
351         bad_conn_close = true;
352         oom_close = true;
353         if (was_empty) {
354             // We can't send out-of-memory response as we already wrote as much as we
355             // could above. Neither can we later send the response since we have currently
356             // sent an incomplete packet. All we can do is close the connection.
357             return false;
358         }
359         else {
360             iob.set_watches(OUT_EVENTS);
361             return true;
362         }
363     }
364 }
365
366 // This queuePacket method is frustratingly similar to the one above, but the subtle differences
367 // make them extraordinary difficult to combine into a single method.
368 bool control_conn_t::queuePacket(std::vector<char> &&pkt) noexcept
369 {
370     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
371     bool was_empty = outbuf.empty();
372     
373     if (was_empty) {
374         outpkt_index = 0;
375         // We can try sending the packet immediately:
376         int wr = write(iob.get_watched_fd(), pkt.data(), pkt.size());
377         if (wr == -1) {
378             if (errno == EPIPE) {
379                 return false;
380             }
381             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
382                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
383                 return false;
384             }
385             // EAGAIN etc: fall through to below
386         }
387         else {
388             if ((unsigned)wr == pkt.size()) {
389                 // Ok, all written.
390                 iob.set_watches(in_flag);
391                 return true;
392             }
393             outpkt_index = wr;
394         }
395     }
396     
397     try {
398         outbuf.emplace_back(pkt);
399         iob.set_watches(in_flag | OUT_EVENTS);
400         return true;
401     }
402     catch (std::bad_alloc &baexc) {
403         // Mark the connection bad, and stop reading further requests
404         bad_conn_close = true;
405         oom_close = true;
406         if (was_empty) {
407             // We can't send out-of-memory response as we already wrote as much as we
408             // could above. Neither can we later send the response since we have currently
409             // sent an incomplete packet. All we can do is close the connection.
410             return false;
411         }
412         else {
413             iob.set_watches(OUT_EVENTS);
414             return true;
415         }
416     }
417 }
418
419 bool control_conn_t::rollbackComplete() noexcept
420 {
421     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
422     return queuePacket(ackBuf, 2);
423 }
424
425 bool control_conn_t::dataReady() noexcept
426 {
427     int fd = iob.get_watched_fd();
428     
429     int r = rbuf.fill(fd);
430     
431     // Note file descriptor is non-blocking
432     if (r == -1) {
433         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
434             log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
435             return true;
436         }
437         return false;
438     }
439     
440     if (r == 0) {
441         return true;
442     }
443     
444     // complete packet?
445     if (rbuf.get_length() >= chklen) {
446         try {
447             return !processPacket();
448         }
449         catch (std::bad_alloc &baexc) {
450             doOomClose();
451             return false;
452         }
453     }
454     else if (rbuf.get_length() == 1024) {
455         // Too big packet
456         log(loglevel_t::WARN, "Received too-large control package; dropping connection");
457         bad_conn_close = true;
458         iob.set_watches(OUT_EVENTS);
459     }
460     else {
461         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
462         iob.set_watches(IN_EVENTS | out_flags);
463     }
464     
465     return false;
466 }
467
468 bool control_conn_t::sendData() noexcept
469 {
470     if (outbuf.empty() && bad_conn_close) {
471         if (oom_close) {
472             // Send oom response
473             char oomBuf[] = { DINIT_RP_OOM };
474             write(iob.get_watched_fd(), oomBuf, 1);
475         }
476         return true;
477     }
478     
479     vector<char> & pkt = outbuf.front();
480     char *data = pkt.data();
481     int written = write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
482     if (written == -1) {
483         if (errno == EPIPE) {
484             // read end closed
485             return true;
486         }
487         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
488             // spurious readiness notification?
489         }
490         else {
491             log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
492             return true;
493         }
494         return false;
495     }
496
497     outpkt_index += written;
498     if (outpkt_index == pkt.size()) {
499         // We've finished this packet, move on to the next:
500         outbuf.pop_front();
501         outpkt_index = 0;
502         if (outbuf.empty() && ! oom_close) {
503             if (! bad_conn_close) {
504                 iob.set_watches(IN_EVENTS);
505             }
506             else {
507                 return true;
508             }
509         }
510     }
511     
512     return false;
513 }
514
515 control_conn_t::~control_conn_t() noexcept
516 {
517     close(iob.get_watched_fd());
518     iob.deregister(*loop);
519     
520     // Clear service listeners
521     for (auto p : serviceKeyMap) {
522         p.first->removeListener(this);
523     }
524     
525     active_control_conns--;
526 }