Move some #include's around to fix build issue.
[oweals/dinit.git] / src / control.cc
1 #include "control.h"
2 #include "service.h"
3
4 namespace {
5     constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
6     constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
7 }
8
9 bool control_conn_t::process_packet()
10 {
11     using std::string;
12     
13     // Note that where we call queue_packet, we must generally check the return value. If it
14     // returns false it has either deleted the connection or marked it for deletion; we
15     // shouldn't touch instance members after that point.
16
17     int pktType = rbuf[0];
18     if (pktType == DINIT_CP_QUERYVERSION) {
19         // Responds with:
20         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
21         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
22         if (! queue_packet(replyBuf, 1)) return false;
23         rbuf.consume(1);
24         return true;
25     }
26     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
27         return process_find_load(pktType);
28     }
29     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
30             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
31         return process_start_stop(pktType);
32     }
33     if (pktType == DINIT_CP_UNPINSERVICE) {
34         return process_unpin_service();
35     }
36     if (pktType == DINIT_CP_SHUTDOWN) {
37         // Shutdown/reboot
38         if (rbuf.get_length() < 2) {
39             chklen = 2;
40             return true;
41         }
42         
43         auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
44         
45         services->stop_all_services(sd_type);
46         char ackBuf[] = { DINIT_RP_ACK };
47         if (! queue_packet(ackBuf, 1)) return false;
48         
49         // Clear the packet from the buffer
50         rbuf.consume(2);
51         chklen = 0;
52         return true;
53     }
54     if (pktType == DINIT_CP_LISTSERVICES) {
55         return list_services();
56     }
57     else {
58         // Unrecognized: give error response
59         char outbuf[] = { DINIT_RP_BADREQ };
60         if (! queue_packet(outbuf, 1)) return false;
61         bad_conn_close = true;
62         iob.set_watches(OUT_EVENTS);
63     }
64     return true;
65 }
66
67 bool control_conn_t::process_find_load(int pktType)
68 {
69     using std::string;
70     
71     constexpr int pkt_size = 4;
72     
73     if (rbuf.get_length() < pkt_size) {
74         chklen = pkt_size;
75         return true;
76     }
77     
78     uint16_t svcSize;
79     rbuf.extract((char *)&svcSize, 1, 2);
80     chklen = svcSize + 3; // packet type + (2 byte) length + service name
81     if (svcSize <= 0 || chklen > 1024) {
82         // Queue error response / mark connection bad
83         char badreqRep[] = { DINIT_RP_BADREQ };
84         if (! queue_packet(badreqRep, 1)) return false;
85         bad_conn_close = true;
86         iob.set_watches(OUT_EVENTS);
87         return true;
88     }
89     
90     if (rbuf.get_length() < chklen) {
91         // packet not complete yet; read more
92         return true;
93     }
94     
95     service_record * record = nullptr;
96     
97     string serviceName = rbuf.extract_string(3, svcSize);
98     
99     if (pktType == DINIT_CP_LOADSERVICE) {
100         // LOADSERVICE
101         try {
102             record = services->load_service(serviceName.c_str());
103         }
104         catch (service_load_exc &slexc) {
105             log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
106         }
107     }
108     else {
109         // FINDSERVICE
110         record = services->find_service(serviceName.c_str());
111     }
112     
113     if (record != nullptr) {
114         // Allocate a service handle
115         handle_t handle = allocate_service_handle(record);
116         std::vector<char> rp_buf;
117         rp_buf.reserve(7);
118         rp_buf.push_back(DINIT_RP_SERVICERECORD);
119         rp_buf.push_back(static_cast<char>(record->get_state()));
120         for (int i = 0; i < (int) sizeof(handle); i++) {
121             rp_buf.push_back(*(((char *) &handle) + i));
122         }
123         rp_buf.push_back(static_cast<char>(record->get_target_state()));
124         if (! queue_packet(std::move(rp_buf))) return false;
125     }
126     else {
127         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
128         if (! queue_packet(std::move(rp_buf))) return false;
129     }
130     
131     // Clear the packet from the buffer
132     rbuf.consume(chklen);
133     chklen = 0;
134     return true;
135 }
136
137 bool control_conn_t::process_start_stop(int pktType)
138 {
139     using std::string;
140     
141     constexpr int pkt_size = 2 + sizeof(handle_t);
142     
143     if (rbuf.get_length() < pkt_size) {
144         chklen = pkt_size;
145         return true;
146     }
147     
148     // 1 byte: packet type
149     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
150     // 4 bytes: service handle
151     
152     bool do_pin = (rbuf[1] == 1);
153     handle_t handle;
154     rbuf.extract((char *) &handle, 2, sizeof(handle));
155     
156     service_record *service = find_service_for_key(handle);
157     if (service == nullptr) {
158         // Service handle is bad
159         char badreqRep[] = { DINIT_RP_BADREQ };
160         if (! queue_packet(badreqRep, 1)) return false;
161         bad_conn_close = true;
162         iob.set_watches(OUT_EVENTS);
163         return true;
164     }
165     else {
166         bool already_there = false;
167         
168         switch (pktType) {
169         case DINIT_CP_STARTSERVICE:
170             // start service, mark as required
171             if (do_pin) service->pin_start();
172             service->start();
173             services->process_queues();
174             already_there = service->get_state() == service_state_t::STARTED;
175             break;
176         case DINIT_CP_STOPSERVICE:
177             // force service to stop
178             if (do_pin) service->pin_stop();
179             service->stop(true);
180             service->forced_stop();
181             services->process_queues();
182             already_there = service->get_state() == service_state_t::STOPPED;
183             break;
184         case DINIT_CP_WAKESERVICE:
185             // re-start a stopped service (do not mark as required)
186             if (do_pin) service->pin_start();
187             service->start(false);
188             services->process_queues();
189             already_there = service->get_state() == service_state_t::STARTED;
190             break;
191         case DINIT_CP_RELEASESERVICE:
192             // remove required mark, stop if not required by dependents
193             if (do_pin) service->pin_stop();
194             service->stop(false);
195             services->process_queues();
196             already_there = service->get_state() == service_state_t::STOPPED;
197             break;
198         }
199         
200         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
201         
202         if (! queue_packet(ack_buf, 1)) return false;
203     }
204     
205     // Clear the packet from the buffer
206     rbuf.consume(pkt_size);
207     chklen = 0;
208     return true;
209 }
210
211 bool control_conn_t::process_unpin_service()
212 {
213     using std::string;
214     
215     constexpr int pkt_size = 1 + sizeof(handle_t);
216     
217     if (rbuf.get_length() < pkt_size) {
218         chklen = pkt_size;
219         return true;
220     }
221     
222     // 1 byte: packet type
223     // 4 bytes: service handle
224     
225     handle_t handle;
226     rbuf.extract((char *) &handle, 1, sizeof(handle));
227     
228     service_record *service = find_service_for_key(handle);
229     if (service == nullptr) {
230         // Service handle is bad
231         char badreqRep[] = { DINIT_RP_BADREQ };
232         if (! queue_packet(badreqRep, 1)) return false;
233         bad_conn_close = true;
234         iob.set_watches(OUT_EVENTS);
235         return true;
236     }
237     else {
238         service->unpin();
239         services->process_queues();
240         char ack_buf[] = { (char) DINIT_RP_ACK };
241         if (! queue_packet(ack_buf, 1)) return false;
242     }
243     
244     // Clear the packet from the buffer
245     rbuf.consume(pkt_size);
246     chklen = 0;
247     return true;
248 }
249
250 bool control_conn_t::list_services()
251 {
252     rbuf.consume(1); // clear request packet
253     chklen = 0;
254     
255     try {
256         auto slist = services->list_services();
257         for (auto sptr : slist) {
258             std::vector<char> pkt_buf;
259             
260             const std::string &name = sptr->get_name();
261             int nameLen = std::min((size_t)256, name.length());
262             pkt_buf.resize(8 + nameLen);
263             
264             pkt_buf[0] = DINIT_RP_SVCINFO;
265             pkt_buf[1] = nameLen;
266             pkt_buf[2] = static_cast<char>(sptr->get_state());
267             pkt_buf[3] = static_cast<char>(sptr->get_target_state());
268             
269             pkt_buf[4] = 0; // reserved
270             pkt_buf[5] = 0;
271             pkt_buf[6] = 0;
272             pkt_buf[7] = 0;
273             
274             for (int i = 0; i < nameLen; i++) {
275                 pkt_buf[8+i] = name[i];
276             }
277             
278             if (! queue_packet(std::move(pkt_buf))) return false;
279         }
280         
281         char ack_buf[] = { (char) DINIT_RP_LISTDONE };
282         if (! queue_packet(ack_buf, 1)) return false;
283         
284         return true;
285     }
286     catch (std::bad_alloc &exc)
287     {
288         do_oom_close();
289         return true;
290     }
291 }
292
293 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
294 {
295     bool is_unique = true;
296     handle_t largest_seen = 0;
297     handle_t candidate = 0;
298     for (auto p : keyServiceMap) {
299         if (p.first > largest_seen) largest_seen = p.first;
300         if (p.first == candidate) {
301             if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
302             candidate = largest_seen + 1;
303         }
304         is_unique &= (p.second != record);
305     }
306     
307     keyServiceMap[candidate] = record;
308     serviceKeyMap.insert(std::make_pair(record, candidate));
309     
310     if (is_unique) {
311         record->add_listener(this);
312     }
313     
314     return candidate;
315 }
316
317 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
318 {
319     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
320     bool was_empty = outbuf.empty();
321
322     // If the queue is empty, we can try to write the packet out now rather than queueing it.
323     // If the write is unsuccessful or partial, we queue the remainder.
324     if (was_empty) {
325         int wr = write(iob.get_watched_fd(), pkt, size);
326         if (wr == -1) {
327             if (errno == EPIPE) {
328                 return false;
329             }
330             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
331                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
332                 return false;
333             }
334             // EAGAIN etc: fall through to below
335         }
336         else {
337             if ((unsigned)wr == size) {
338                 // Ok, all written.
339                 iob.set_watches(in_flag);
340                 return true;
341             }
342             pkt += wr;
343             size -= wr;
344         }
345     }
346     
347     // Create a vector out of the (remaining part of the) packet:
348     try {
349         outbuf.emplace_back(pkt, pkt + size);
350         iob.set_watches(in_flag | OUT_EVENTS);
351         return true;
352     }
353     catch (std::bad_alloc &baexc) {
354         // Mark the connection bad, and stop reading further requests
355         bad_conn_close = true;
356         oom_close = true;
357         if (was_empty) {
358             // We can't send out-of-memory response as we already wrote as much as we
359             // could above. Neither can we later send the response since we have currently
360             // sent an incomplete packet. All we can do is close the connection.
361             return false;
362         }
363         else {
364             iob.set_watches(OUT_EVENTS);
365             return true;
366         }
367     }
368 }
369
370 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
371 // make them extraordinary difficult to combine into a single method.
372 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
373 {
374     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
375     bool was_empty = outbuf.empty();
376     
377     if (was_empty) {
378         outpkt_index = 0;
379         // We can try sending the packet immediately:
380         int wr = write(iob.get_watched_fd(), pkt.data(), pkt.size());
381         if (wr == -1) {
382             if (errno == EPIPE) {
383                 return false;
384             }
385             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
386                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
387                 return false;
388             }
389             // EAGAIN etc: fall through to below
390         }
391         else {
392             if ((unsigned)wr == pkt.size()) {
393                 // Ok, all written.
394                 iob.set_watches(in_flag);
395                 return true;
396             }
397             outpkt_index = wr;
398         }
399     }
400     
401     try {
402         outbuf.emplace_back(pkt);
403         iob.set_watches(in_flag | OUT_EVENTS);
404         return true;
405     }
406     catch (std::bad_alloc &baexc) {
407         // Mark the connection bad, and stop reading further requests
408         bad_conn_close = true;
409         oom_close = true;
410         if (was_empty) {
411             // We can't send out-of-memory response as we already wrote as much as we
412             // could above. Neither can we later send the response since we have currently
413             // sent an incomplete packet. All we can do is close the connection.
414             return false;
415         }
416         else {
417             iob.set_watches(OUT_EVENTS);
418             return true;
419         }
420     }
421 }
422
423 bool control_conn_t::rollback_complete() noexcept
424 {
425     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
426     return queue_packet(ackBuf, 2);
427 }
428
429 bool control_conn_t::data_ready() noexcept
430 {
431     int fd = iob.get_watched_fd();
432     
433     int r = rbuf.fill(fd);
434     
435     // Note file descriptor is non-blocking
436     if (r == -1) {
437         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
438             log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
439             return true;
440         }
441         return false;
442     }
443     
444     if (r == 0) {
445         return true;
446     }
447     
448     // complete packet?
449     if (rbuf.get_length() >= chklen) {
450         try {
451             return !process_packet();
452         }
453         catch (std::bad_alloc &baexc) {
454             do_oom_close();
455             return false;
456         }
457     }
458     else if (rbuf.get_length() == 1024) {
459         // Too big packet
460         log(loglevel_t::WARN, "Received too-large control package; dropping connection");
461         bad_conn_close = true;
462         iob.set_watches(OUT_EVENTS);
463     }
464     else {
465         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
466         iob.set_watches(IN_EVENTS | out_flags);
467     }
468     
469     return false;
470 }
471
472 bool control_conn_t::send_data() noexcept
473 {
474     if (outbuf.empty() && bad_conn_close) {
475         if (oom_close) {
476             // Send oom response
477             char oomBuf[] = { DINIT_RP_OOM };
478             write(iob.get_watched_fd(), oomBuf, 1);
479         }
480         return true;
481     }
482     
483     vector<char> & pkt = outbuf.front();
484     char *data = pkt.data();
485     int written = write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
486     if (written == -1) {
487         if (errno == EPIPE) {
488             // read end closed
489             return true;
490         }
491         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
492             // spurious readiness notification?
493         }
494         else {
495             log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
496             return true;
497         }
498         return false;
499     }
500
501     outpkt_index += written;
502     if (outpkt_index == pkt.size()) {
503         // We've finished this packet, move on to the next:
504         outbuf.pop_front();
505         outpkt_index = 0;
506         if (outbuf.empty() && ! oom_close) {
507             if (! bad_conn_close) {
508                 iob.set_watches(IN_EVENTS);
509             }
510             else {
511                 return true;
512             }
513         }
514     }
515     
516     return false;
517 }
518
519 control_conn_t::~control_conn_t() noexcept
520 {
521     close(iob.get_watched_fd());
522     iob.deregister(loop);
523     
524     // Clear service listeners
525     for (auto p : serviceKeyMap) {
526         p.first->remove_listener(this);
527     }
528     
529     active_control_conns--;
530 }