Implement service unloading.
[oweals/dinit.git] / src / control.cc
1 #include "control.h"
2 #include "service.h"
3
4 namespace {
5     constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
6     constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
7 }
8
9 bool control_conn_t::process_packet()
10 {
11     using std::string;
12     
13     // Note that where we call queue_packet, we must generally check the return value. If it
14     // returns false it has either deleted the connection or marked it for deletion; we
15     // shouldn't touch instance members after that point.
16
17     int pktType = rbuf[0];
18     if (pktType == DINIT_CP_QUERYVERSION) {
19         // Responds with:
20         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
21         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
22         if (! queue_packet(replyBuf, 1)) return false;
23         rbuf.consume(1);
24         return true;
25     }
26     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
27         return process_find_load(pktType);
28     }
29     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
30             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
31         return process_start_stop(pktType);
32     }
33     if (pktType == DINIT_CP_UNPINSERVICE) {
34         return process_unpin_service();
35     }
36     if (pktType == DINIT_CP_UNLOADSERVICE) {
37         return process_unload_service();
38     }
39     if (pktType == DINIT_CP_SHUTDOWN) {
40         // Shutdown/reboot
41         if (rbuf.get_length() < 2) {
42             chklen = 2;
43             return true;
44         }
45         
46         auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
47         
48         services->stop_all_services(sd_type);
49         char ackBuf[] = { DINIT_RP_ACK };
50         if (! queue_packet(ackBuf, 1)) return false;
51         
52         // Clear the packet from the buffer
53         rbuf.consume(2);
54         chklen = 0;
55         return true;
56     }
57     if (pktType == DINIT_CP_LISTSERVICES) {
58         return list_services();
59     }
60     else {
61         // Unrecognized: give error response
62         char outbuf[] = { DINIT_RP_BADREQ };
63         if (! queue_packet(outbuf, 1)) return false;
64         bad_conn_close = true;
65         iob.set_watches(OUT_EVENTS);
66     }
67     return true;
68 }
69
70 bool control_conn_t::process_find_load(int pktType)
71 {
72     using std::string;
73     
74     constexpr int pkt_size = 4;
75     
76     if (rbuf.get_length() < pkt_size) {
77         chklen = pkt_size;
78         return true;
79     }
80     
81     uint16_t svcSize;
82     rbuf.extract((char *)&svcSize, 1, 2);
83     chklen = svcSize + 3; // packet type + (2 byte) length + service name
84     if (svcSize <= 0 || chklen > 1024) {
85         // Queue error response / mark connection bad
86         char badreqRep[] = { DINIT_RP_BADREQ };
87         if (! queue_packet(badreqRep, 1)) return false;
88         bad_conn_close = true;
89         iob.set_watches(OUT_EVENTS);
90         return true;
91     }
92     
93     if (rbuf.get_length() < chklen) {
94         // packet not complete yet; read more
95         return true;
96     }
97     
98     service_record * record = nullptr;
99     
100     string serviceName = rbuf.extract_string(3, svcSize);
101     
102     if (pktType == DINIT_CP_LOADSERVICE) {
103         // LOADSERVICE
104         try {
105             record = services->load_service(serviceName.c_str());
106         }
107         catch (service_load_exc &slexc) {
108             log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
109         }
110     }
111     else {
112         // FINDSERVICE
113         record = services->find_service(serviceName.c_str());
114     }
115     
116     if (record != nullptr) {
117         // Allocate a service handle
118         handle_t handle = allocate_service_handle(record);
119         std::vector<char> rp_buf;
120         rp_buf.reserve(7);
121         rp_buf.push_back(DINIT_RP_SERVICERECORD);
122         rp_buf.push_back(static_cast<char>(record->get_state()));
123         for (int i = 0; i < (int) sizeof(handle); i++) {
124             rp_buf.push_back(*(((char *) &handle) + i));
125         }
126         rp_buf.push_back(static_cast<char>(record->get_target_state()));
127         if (! queue_packet(std::move(rp_buf))) return false;
128     }
129     else {
130         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
131         if (! queue_packet(std::move(rp_buf))) return false;
132     }
133     
134     // Clear the packet from the buffer
135     rbuf.consume(chklen);
136     chklen = 0;
137     return true;
138 }
139
140 bool control_conn_t::process_start_stop(int pktType)
141 {
142     using std::string;
143     
144     constexpr int pkt_size = 2 + sizeof(handle_t);
145     
146     if (rbuf.get_length() < pkt_size) {
147         chklen = pkt_size;
148         return true;
149     }
150     
151     // 1 byte: packet type
152     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
153     // 4 bytes: service handle
154     
155     bool do_pin = (rbuf[1] == 1);
156     handle_t handle;
157     rbuf.extract((char *) &handle, 2, sizeof(handle));
158     
159     service_record *service = find_service_for_key(handle);
160     if (service == nullptr) {
161         // Service handle is bad
162         char badreqRep[] = { DINIT_RP_BADREQ };
163         if (! queue_packet(badreqRep, 1)) return false;
164         bad_conn_close = true;
165         iob.set_watches(OUT_EVENTS);
166         return true;
167     }
168     else {
169         bool already_there = false;
170         
171         switch (pktType) {
172         case DINIT_CP_STARTSERVICE:
173             // start service, mark as required
174             if (do_pin) service->pin_start();
175             service->start();
176             services->process_queues();
177             already_there = service->get_state() == service_state_t::STARTED;
178             break;
179         case DINIT_CP_STOPSERVICE:
180             // force service to stop
181             if (do_pin) service->pin_stop();
182             service->stop(true);
183             service->forced_stop();
184             services->process_queues();
185             already_there = service->get_state() == service_state_t::STOPPED;
186             break;
187         case DINIT_CP_WAKESERVICE:
188             // re-start a stopped service (do not mark as required)
189             if (do_pin) service->pin_start();
190             service->start(false);
191             services->process_queues();
192             already_there = service->get_state() == service_state_t::STARTED;
193             break;
194         case DINIT_CP_RELEASESERVICE:
195             // remove required mark, stop if not required by dependents
196             if (do_pin) service->pin_stop();
197             service->stop(false);
198             services->process_queues();
199             already_there = service->get_state() == service_state_t::STOPPED;
200             break;
201         }
202         
203         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
204         
205         if (! queue_packet(ack_buf, 1)) return false;
206     }
207     
208     // Clear the packet from the buffer
209     rbuf.consume(pkt_size);
210     chklen = 0;
211     return true;
212 }
213
214 bool control_conn_t::process_unpin_service()
215 {
216     using std::string;
217     
218     constexpr int pkt_size = 1 + sizeof(handle_t);
219     
220     if (rbuf.get_length() < pkt_size) {
221         chklen = pkt_size;
222         return true;
223     }
224     
225     // 1 byte: packet type
226     // 4 bytes: service handle
227     
228     handle_t handle;
229     rbuf.extract((char *) &handle, 1, sizeof(handle));
230     
231     service_record *service = find_service_for_key(handle);
232     if (service == nullptr) {
233         // Service handle is bad
234         char badreqRep[] = { DINIT_RP_BADREQ };
235         if (! queue_packet(badreqRep, 1)) return false;
236         bad_conn_close = true;
237         iob.set_watches(OUT_EVENTS);
238         return true;
239     }
240
241     service->unpin();
242     services->process_queues();
243     char ack_buf[] = { (char) DINIT_RP_ACK };
244     if (! queue_packet(ack_buf, 1)) return false;
245     
246     // Clear the packet from the buffer
247     rbuf.consume(pkt_size);
248     chklen = 0;
249     return true;
250 }
251
252 bool control_conn_t::process_unload_service()
253 {
254     using std::string;
255
256     constexpr int pkt_size = 1 + sizeof(handle_t);
257
258     if (rbuf.get_length() < pkt_size) {
259         chklen = pkt_size;
260         return true;
261     }
262
263     // 1 byte: packet type
264     // 4 bytes: service handle
265
266     handle_t handle;
267     rbuf.extract((char *) &handle, 1, sizeof(handle));
268
269     service_record *service = find_service_for_key(handle);
270     if (service == nullptr) {
271         // Service handle is bad
272         char badreq_rep[] = { DINIT_RP_BADREQ };
273         if (! queue_packet(badreq_rep, 1)) return false;
274         bad_conn_close = true;
275         iob.set_watches(OUT_EVENTS);
276         return true;
277     }
278
279     if (! service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
280         // Cannot unload: has other references
281         char nak_rep[] = { DINIT_RP_NAK };
282         if (! queue_packet(nak_rep, 1)) return false;
283     }
284     else {
285         // unload
286         service->prepare_for_unload();
287         services->remove_service(service);
288         delete service;
289
290         // drop handle
291         service_key_map.erase(service);
292         key_service_map.erase(handle);
293
294         // send ack
295         char ack_buf[] = { (char) DINIT_RP_ACK };
296         if (! queue_packet(ack_buf, 1)) return false;
297     }
298
299     // Clear the packet from the buffer
300     rbuf.consume(pkt_size);
301     chklen = 0;
302     return true;
303 }
304
305 bool control_conn_t::list_services()
306 {
307     rbuf.consume(1); // clear request packet
308     chklen = 0;
309     
310     try {
311         auto slist = services->list_services();
312         for (auto sptr : slist) {
313             std::vector<char> pkt_buf;
314             
315             const std::string &name = sptr->get_name();
316             int nameLen = std::min((size_t)256, name.length());
317             pkt_buf.resize(8 + nameLen);
318             
319             pkt_buf[0] = DINIT_RP_SVCINFO;
320             pkt_buf[1] = nameLen;
321             pkt_buf[2] = static_cast<char>(sptr->get_state());
322             pkt_buf[3] = static_cast<char>(sptr->get_target_state());
323             
324             pkt_buf[4] = 0; // reserved
325             pkt_buf[5] = 0;
326             pkt_buf[6] = 0;
327             pkt_buf[7] = 0;
328             
329             for (int i = 0; i < nameLen; i++) {
330                 pkt_buf[8+i] = name[i];
331             }
332             
333             if (! queue_packet(std::move(pkt_buf))) return false;
334         }
335         
336         char ack_buf[] = { (char) DINIT_RP_LISTDONE };
337         if (! queue_packet(ack_buf, 1)) return false;
338         
339         return true;
340     }
341     catch (std::bad_alloc &exc)
342     {
343         do_oom_close();
344         return true;
345     }
346 }
347
348 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
349 {
350     // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
351     // we find a gap in the handle values.
352     handle_t candidate = 0;
353     for (auto p : key_service_map) {
354         if (p.first == candidate) candidate++;
355         else break;
356     }
357
358     bool is_unique = (service_key_map.find(record) == service_key_map.end());
359
360     // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
361     // must undo any previous actions:
362     if (is_unique) {
363         record->add_listener(this);
364     }
365     
366     try {
367         key_service_map[candidate] = record;
368         service_key_map.insert(std::make_pair(record, candidate));
369     }
370     catch (...) {
371         if (is_unique) {
372             record->remove_listener(this);
373         }
374
375         key_service_map.erase(candidate);
376     }
377     
378     return candidate;
379 }
380
381 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
382 {
383     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
384     bool was_empty = outbuf.empty();
385
386     // If the queue is empty, we can try to write the packet out now rather than queueing it.
387     // If the write is unsuccessful or partial, we queue the remainder.
388     if (was_empty) {
389         int wr = write(iob.get_watched_fd(), pkt, size);
390         if (wr == -1) {
391             if (errno == EPIPE) {
392                 return false;
393             }
394             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
395                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
396                 return false;
397             }
398             // EAGAIN etc: fall through to below
399         }
400         else {
401             if ((unsigned)wr == size) {
402                 // Ok, all written.
403                 iob.set_watches(in_flag);
404                 return true;
405             }
406             pkt += wr;
407             size -= wr;
408         }
409     }
410     
411     // Create a vector out of the (remaining part of the) packet:
412     try {
413         outbuf.emplace_back(pkt, pkt + size);
414         iob.set_watches(in_flag | OUT_EVENTS);
415         return true;
416     }
417     catch (std::bad_alloc &baexc) {
418         // Mark the connection bad, and stop reading further requests
419         bad_conn_close = true;
420         oom_close = true;
421         if (was_empty) {
422             // We can't send out-of-memory response as we already wrote as much as we
423             // could above. Neither can we later send the response since we have currently
424             // sent an incomplete packet. All we can do is close the connection.
425             return false;
426         }
427         else {
428             iob.set_watches(OUT_EVENTS);
429             return true;
430         }
431     }
432 }
433
434 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
435 // make them extraordinary difficult to combine into a single method.
436 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
437 {
438     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
439     bool was_empty = outbuf.empty();
440     
441     if (was_empty) {
442         outpkt_index = 0;
443         // We can try sending the packet immediately:
444         int wr = write(iob.get_watched_fd(), pkt.data(), pkt.size());
445         if (wr == -1) {
446             if (errno == EPIPE) {
447                 return false;
448             }
449             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
450                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
451                 return false;
452             }
453             // EAGAIN etc: fall through to below
454         }
455         else {
456             if ((unsigned)wr == pkt.size()) {
457                 // Ok, all written.
458                 iob.set_watches(in_flag);
459                 return true;
460             }
461             outpkt_index = wr;
462         }
463     }
464     
465     try {
466         outbuf.emplace_back(pkt);
467         iob.set_watches(in_flag | OUT_EVENTS);
468         return true;
469     }
470     catch (std::bad_alloc &baexc) {
471         // Mark the connection bad, and stop reading further requests
472         bad_conn_close = true;
473         oom_close = true;
474         if (was_empty) {
475             // We can't send out-of-memory response as we already wrote as much as we
476             // could above. Neither can we later send the response since we have currently
477             // sent an incomplete packet. All we can do is close the connection.
478             return false;
479         }
480         else {
481             iob.set_watches(OUT_EVENTS);
482             return true;
483         }
484     }
485 }
486
487 bool control_conn_t::rollback_complete() noexcept
488 {
489     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
490     return queue_packet(ackBuf, 2);
491 }
492
493 bool control_conn_t::data_ready() noexcept
494 {
495     int fd = iob.get_watched_fd();
496     
497     int r = rbuf.fill(fd);
498     
499     // Note file descriptor is non-blocking
500     if (r == -1) {
501         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
502             log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
503             return true;
504         }
505         return false;
506     }
507     
508     if (r == 0) {
509         return true;
510     }
511     
512     // complete packet?
513     if (rbuf.get_length() >= chklen) {
514         try {
515             return !process_packet();
516         }
517         catch (std::bad_alloc &baexc) {
518             do_oom_close();
519             return false;
520         }
521     }
522     else if (rbuf.get_length() == 1024) {
523         // Too big packet
524         log(loglevel_t::WARN, "Received too-large control package; dropping connection");
525         bad_conn_close = true;
526         iob.set_watches(OUT_EVENTS);
527     }
528     else {
529         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
530         iob.set_watches(IN_EVENTS | out_flags);
531     }
532     
533     return false;
534 }
535
536 bool control_conn_t::send_data() noexcept
537 {
538     if (outbuf.empty() && bad_conn_close) {
539         if (oom_close) {
540             // Send oom response
541             char oomBuf[] = { DINIT_RP_OOM };
542             write(iob.get_watched_fd(), oomBuf, 1);
543         }
544         return true;
545     }
546     
547     vector<char> & pkt = outbuf.front();
548     char *data = pkt.data();
549     int written = write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
550     if (written == -1) {
551         if (errno == EPIPE) {
552             // read end closed
553             return true;
554         }
555         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
556             // spurious readiness notification?
557         }
558         else {
559             log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
560             return true;
561         }
562         return false;
563     }
564
565     outpkt_index += written;
566     if (outpkt_index == pkt.size()) {
567         // We've finished this packet, move on to the next:
568         outbuf.pop_front();
569         outpkt_index = 0;
570         if (outbuf.empty() && ! oom_close) {
571             if (! bad_conn_close) {
572                 iob.set_watches(IN_EVENTS);
573             }
574             else {
575                 return true;
576             }
577         }
578     }
579     
580     return false;
581 }
582
583 control_conn_t::~control_conn_t() noexcept
584 {
585     close(iob.get_watched_fd());
586     iob.deregister(loop);
587     
588     // Clear service listeners
589     for (auto p : service_key_map) {
590         p.first->remove_listener(this);
591     }
592     
593     active_control_conns--;
594 }