Adjust dependency auto-break logic.
[oweals/dinit.git] / src / control.cc
1 #include <algorithm>
2 #include <unordered_set>
3
4 #include "control.h"
5 #include "service.h"
6
7 // Server-side control protocol implementation. This implements the functionality that allows
8 // clients (such as dinitctl) to query service state and issue commands to control services.
9
10 namespace {
11     constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
12     constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
13
14     // Control protocol minimum compatible version and current version:
15     constexpr uint16_t min_compat_version = 1;
16     constexpr uint16_t cp_version = 1;
17
18     // check for value in a set
19     template <typename T, int N>
20     inline bool contains(const T (&v)[N], int i)
21     {
22         return std::find_if(std::begin(v), std::end(v),
23                 [=](T p){ return i == static_cast<int>(p); }) != std::end(v);
24     }
25 }
26
27 bool control_conn_t::process_packet()
28 {
29     using std::string;
30     
31     // Note that where we call queue_packet, we must generally check the return value. If it
32     // returns false it has either deleted the connection or marked it for deletion; we
33     // shouldn't touch instance members after that point.
34
35     int pktType = rbuf[0];
36     if (pktType == DINIT_CP_QUERYVERSION) {
37         // Responds with:
38         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) actual version
39         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
40         memcpy(replyBuf + 1, &min_compat_version, 2);
41         memcpy(replyBuf + 3, &cp_version, 2);
42         if (! queue_packet(replyBuf, sizeof(replyBuf))) return false;
43         rbuf.consume(1);
44         return true;
45     }
46     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
47         return process_find_load(pktType);
48     }
49     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
50             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
51         return process_start_stop(pktType);
52     }
53     if (pktType == DINIT_CP_UNPINSERVICE) {
54         return process_unpin_service();
55     }
56     if (pktType == DINIT_CP_UNLOADSERVICE) {
57         return process_unload_service();
58     }
59     if (pktType == DINIT_CP_SHUTDOWN) {
60         // Shutdown/reboot
61         if (rbuf.get_length() < 2) {
62             chklen = 2;
63             return true;
64         }
65         
66         if (contains({shutdown_type_t::CONTINUE, shutdown_type_t::HALT,
67                 shutdown_type_t::POWEROFF, shutdown_type_t::REBOOT}, rbuf[1])) {
68             auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
69
70             services->stop_all_services(sd_type);
71             char ackBuf[] = { DINIT_RP_ACK };
72             if (! queue_packet(ackBuf, 1)) return false;
73
74             // Clear the packet from the buffer
75             rbuf.consume(2);
76             chklen = 0;
77             return true;
78         }
79
80         // (otherwise fall through to below).
81     }
82     if (pktType == DINIT_CP_LISTSERVICES) {
83         return list_services();
84     }
85     if (pktType == DINIT_CP_ADD_DEP) {
86         return add_service_dep();
87     }
88     if (pktType == DINIT_CP_REM_DEP) {
89         return rm_service_dep();
90     }
91
92     // Unrecognized: give error response
93     char outbuf[] = { DINIT_RP_BADREQ };
94     if (! queue_packet(outbuf, 1)) return false;
95     bad_conn_close = true;
96     iob.set_watches(OUT_EVENTS);
97     return true;
98 }
99
100 bool control_conn_t::process_find_load(int pktType)
101 {
102     using std::string;
103     
104     constexpr int pkt_size = 4;
105     
106     if (rbuf.get_length() < pkt_size) {
107         chklen = pkt_size;
108         return true;
109     }
110     
111     uint16_t svcSize;
112     rbuf.extract((char *)&svcSize, 1, 2);
113     chklen = svcSize + 3; // packet type + (2 byte) length + service name
114     if (svcSize <= 0 || chklen > 1024) {
115         // Queue error response / mark connection bad
116         char badreqRep[] = { DINIT_RP_BADREQ };
117         if (! queue_packet(badreqRep, 1)) return false;
118         bad_conn_close = true;
119         iob.set_watches(OUT_EVENTS);
120         return true;
121     }
122     
123     if (rbuf.get_length() < chklen) {
124         // packet not complete yet; read more
125         return true;
126     }
127     
128     service_record * record = nullptr;
129     
130     string serviceName = rbuf.extract_string(3, svcSize);
131     
132     if (pktType == DINIT_CP_LOADSERVICE) {
133         // LOADSERVICE
134         try {
135             record = services->load_service(serviceName.c_str());
136         }
137         catch (service_load_exc &slexc) {
138             log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
139         }
140     }
141     else {
142         // FINDSERVICE
143         record = services->find_service(serviceName.c_str());
144     }
145     
146     if (record != nullptr) {
147         // Allocate a service handle
148         handle_t handle = allocate_service_handle(record);
149         std::vector<char> rp_buf;
150         rp_buf.reserve(7);
151         rp_buf.push_back(DINIT_RP_SERVICERECORD);
152         rp_buf.push_back(static_cast<char>(record->get_state()));
153         for (int i = 0; i < (int) sizeof(handle); i++) {
154             rp_buf.push_back(*(((char *) &handle) + i));
155         }
156         rp_buf.push_back(static_cast<char>(record->get_target_state()));
157         if (! queue_packet(std::move(rp_buf))) return false;
158     }
159     else {
160         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
161         if (! queue_packet(std::move(rp_buf))) return false;
162     }
163     
164     // Clear the packet from the buffer
165     rbuf.consume(chklen);
166     chklen = 0;
167     return true;
168 }
169
170 bool control_conn_t::process_start_stop(int pktType)
171 {
172     using std::string;
173     
174     constexpr int pkt_size = 2 + sizeof(handle_t);
175     
176     if (rbuf.get_length() < pkt_size) {
177         chklen = pkt_size;
178         return true;
179     }
180     
181     // 1 byte: packet type
182     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
183     // 4 bytes: service handle
184     
185     bool do_pin = (rbuf[1] == 1);
186     handle_t handle;
187     rbuf.extract((char *) &handle, 2, sizeof(handle));
188     
189     service_record *service = find_service_for_key(handle);
190     if (service == nullptr) {
191         // Service handle is bad
192         char badreqRep[] = { DINIT_RP_BADREQ };
193         if (! queue_packet(badreqRep, 1)) return false;
194         bad_conn_close = true;
195         iob.set_watches(OUT_EVENTS);
196         return true;
197     }
198     else {
199         bool already_there = false;
200         
201         switch (pktType) {
202         case DINIT_CP_STARTSERVICE:
203             // start service, mark as required
204             if (do_pin) service->pin_start();
205             service->start();
206             services->process_queues();
207             already_there = service->get_state() == service_state_t::STARTED;
208             break;
209         case DINIT_CP_STOPSERVICE:
210             // force service to stop
211             if (do_pin) service->pin_stop();
212             service->stop(true);
213             service->forced_stop();
214             services->process_queues();
215             already_there = service->get_state() == service_state_t::STOPPED;
216             break;
217         case DINIT_CP_WAKESERVICE:
218             // re-start a stopped service (do not mark as required)
219             if (do_pin) service->pin_start();
220             service->start(false);
221             services->process_queues();
222             already_there = service->get_state() == service_state_t::STARTED;
223             break;
224         case DINIT_CP_RELEASESERVICE:
225             // remove required mark, stop if not required by dependents
226             if (do_pin) service->pin_stop();
227             service->stop(false);
228             services->process_queues();
229             already_there = service->get_state() == service_state_t::STOPPED;
230             break;
231         }
232         
233         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
234         
235         if (! queue_packet(ack_buf, 1)) return false;
236     }
237     
238     // Clear the packet from the buffer
239     rbuf.consume(pkt_size);
240     chklen = 0;
241     return true;
242 }
243
244 bool control_conn_t::process_unpin_service()
245 {
246     using std::string;
247     
248     constexpr int pkt_size = 1 + sizeof(handle_t);
249     
250     if (rbuf.get_length() < pkt_size) {
251         chklen = pkt_size;
252         return true;
253     }
254     
255     // 1 byte: packet type
256     // 4 bytes: service handle
257     
258     handle_t handle;
259     rbuf.extract((char *) &handle, 1, sizeof(handle));
260     
261     service_record *service = find_service_for_key(handle);
262     if (service == nullptr) {
263         // Service handle is bad
264         char badreqRep[] = { DINIT_RP_BADREQ };
265         if (! queue_packet(badreqRep, 1)) return false;
266         bad_conn_close = true;
267         iob.set_watches(OUT_EVENTS);
268         return true;
269     }
270
271     service->unpin();
272     services->process_queues();
273     char ack_buf[] = { (char) DINIT_RP_ACK };
274     if (! queue_packet(ack_buf, 1)) return false;
275     
276     // Clear the packet from the buffer
277     rbuf.consume(pkt_size);
278     chklen = 0;
279     return true;
280 }
281
282 bool control_conn_t::process_unload_service()
283 {
284     using std::string;
285
286     constexpr int pkt_size = 1 + sizeof(handle_t);
287
288     if (rbuf.get_length() < pkt_size) {
289         chklen = pkt_size;
290         return true;
291     }
292
293     // 1 byte: packet type
294     // 4 bytes: service handle
295
296     handle_t handle;
297     rbuf.extract((char *) &handle, 1, sizeof(handle));
298
299     service_record *service = find_service_for_key(handle);
300     if (service == nullptr) {
301         // Service handle is bad
302         char badreq_rep[] = { DINIT_RP_BADREQ };
303         if (! queue_packet(badreq_rep, 1)) return false;
304         bad_conn_close = true;
305         iob.set_watches(OUT_EVENTS);
306         return true;
307     }
308
309     if (! service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
310         // Cannot unload: has other references
311         char nak_rep[] = { DINIT_RP_NAK };
312         if (! queue_packet(nak_rep, 1)) return false;
313     }
314     else {
315         // unload
316         service->prepare_for_unload();
317         services->remove_service(service);
318         delete service;
319
320         // drop handle
321         service_key_map.erase(service);
322         key_service_map.erase(handle);
323
324         // send ack
325         char ack_buf[] = { (char) DINIT_RP_ACK };
326         if (! queue_packet(ack_buf, 1)) return false;
327     }
328
329     // Clear the packet from the buffer
330     rbuf.consume(pkt_size);
331     chklen = 0;
332     return true;
333 }
334
335 bool control_conn_t::list_services()
336 {
337     rbuf.consume(1); // clear request packet
338     chklen = 0;
339     
340     try {
341         auto slist = services->list_services();
342         for (auto sptr : slist) {
343             std::vector<char> pkt_buf;
344             
345             int hdrsize = 8 + std::max(sizeof(int), sizeof(pid_t));
346
347             const std::string &name = sptr->get_name();
348             int nameLen = std::min((size_t)256, name.length());
349             pkt_buf.resize(hdrsize + nameLen);
350             
351             pkt_buf[0] = DINIT_RP_SVCINFO;
352             pkt_buf[1] = nameLen;
353             pkt_buf[2] = static_cast<char>(sptr->get_state());
354             pkt_buf[3] = static_cast<char>(sptr->get_target_state());
355             
356             char b0 = sptr->is_waiting_for_console() ? 1 : 0;
357             b0 |= sptr->has_console() ? 2 : 0;
358             b0 |= sptr->was_start_skipped() ? 4 : 0;
359             pkt_buf[4] = b0;
360             pkt_buf[5] = static_cast<char>(sptr->get_stop_reason());
361
362             pkt_buf[6] = 0; // reserved
363             pkt_buf[7] = 0;
364             
365             // Next: either the exit status, or the process ID
366             if (sptr->get_state() != service_state_t::STOPPED) {
367                 pid_t proc_pid = sptr->get_pid();
368                 memcpy(pkt_buf.data() + 8, &proc_pid, sizeof(proc_pid));
369             }
370             else {
371                 int exit_status = sptr->get_exit_status();
372                 memcpy(pkt_buf.data() + 8, &exit_status, sizeof(exit_status));
373             }
374
375             for (int i = 0; i < nameLen; i++) {
376                 pkt_buf[hdrsize+i] = name[i];
377             }
378             
379             if (! queue_packet(std::move(pkt_buf))) return false;
380         }
381         
382         char ack_buf[] = { (char) DINIT_RP_LISTDONE };
383         if (! queue_packet(ack_buf, 1)) return false;
384         
385         return true;
386     }
387     catch (std::bad_alloc &exc)
388     {
389         do_oom_close();
390         return true;
391     }
392 }
393
394 bool control_conn_t::add_service_dep()
395 {
396     // 1 byte packet type
397     // 1 byte dependency type
398     // handle: "from"
399     // handle: "to"
400
401     constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
402
403     if (rbuf.get_length() < pkt_size) {
404         chklen = pkt_size;
405         return true;
406     }
407
408     handle_t from_handle;
409     handle_t to_handle;
410     rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
411     rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
412
413     service_record *from_service = find_service_for_key(from_handle);
414     service_record *to_service = find_service_for_key(to_handle);
415     if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
416         // Service handle is bad
417         char badreq_rep[] = { DINIT_RP_BADREQ };
418         if (! queue_packet(badreq_rep, 1)) return false;
419         bad_conn_close = true;
420         iob.set_watches(OUT_EVENTS);
421         return true;
422     }
423
424     // Check dependency type is valid:
425     int dep_type_int = rbuf[1];
426     if (! contains({(int)dependency_type::MILESTONE, (int)dependency_type::REGULAR,
427             (int)dependency_type::WAITS_FOR}, dep_type_int)) {
428         char badreqRep[] = { DINIT_RP_BADREQ };
429         if (! queue_packet(badreqRep, 1)) return false;
430         bad_conn_close = true;
431         iob.set_watches(OUT_EVENTS);
432     }
433     dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
434
435     // Check current service states are valid for given dep type
436     if (dep_type == dependency_type::REGULAR) {
437         if (from_service->get_state() != service_state_t::STOPPED &&
438                 to_service->get_state() != service_state_t::STARTED) {
439             // Cannot create dependency now since it would be contradicted:
440             char nak_rep[] = { DINIT_RP_NAK };
441             if (! queue_packet(nak_rep, 1)) return false;
442             rbuf.consume(pkt_size);
443             chklen = 0;
444             return true;
445         }
446     }
447
448     // Check for creation of circular dependency chain
449     std::unordered_set<service_record *> dep_marks;
450     std::vector<service_record *> dep_queue;
451     dep_queue.push_back(to_service);
452     while (! dep_queue.empty()) {
453         service_record * sr = dep_queue.back();
454         dep_queue.pop_back();
455         // iterate deps; if dep == from, abort; otherwise add to set/queue
456         // (only add to queue if not already in set)
457         for (auto &dep : sr->get_dependencies()) {
458             service_record * dep_to = dep.get_to();
459             if (dep_to == from_service) {
460                 // fail, circular dependency!
461                 char nak_rep[] = { DINIT_RP_NAK };
462                 if (! queue_packet(nak_rep, 1)) return false;
463                 rbuf.consume(pkt_size);
464                 chklen = 0;
465                 return true;
466             }
467             if (dep_marks.insert(dep_to).second) {
468                 dep_queue.push_back(dep_to);
469             }
470         }
471     }
472     dep_marks.clear();
473     dep_queue.clear();
474
475     // Prevent creation of duplicate dependency:
476     for (auto &dep : from_service->get_dependencies()) {
477         service_record * dep_to = dep.get_to();
478         if (dep_to == to_service && dep.dep_type == dep_type) {
479             // Dependency already exists: return success
480             char ack_rep[] = { DINIT_RP_ACK };
481             if (! queue_packet(ack_rep, 1)) return false;
482             rbuf.consume(pkt_size);
483             chklen = 0;
484             return true;
485         }
486     }
487
488     // Create dependency:
489     from_service->add_dep(to_service, dep_type);
490     services->process_queues();
491
492     char ack_rep[] = { DINIT_RP_ACK };
493     if (! queue_packet(ack_rep, 1)) return false;
494     rbuf.consume(pkt_size);
495     chklen = 0;
496     return true;
497 }
498
499 bool control_conn_t::rm_service_dep()
500 {
501     // 1 byte packet type
502     // 1 byte dependency type
503     // handle: "from"
504     // handle: "to"
505
506     constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
507
508     if (rbuf.get_length() < pkt_size) {
509         chklen = pkt_size;
510         return true;
511     }
512
513     handle_t from_handle;
514     handle_t to_handle;
515     rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
516     rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
517
518     service_record *from_service = find_service_for_key(from_handle);
519     service_record *to_service = find_service_for_key(to_handle);
520     if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
521         // Service handle is bad
522         char badreq_rep[] = { DINIT_RP_BADREQ };
523         if (! queue_packet(badreq_rep, 1)) return false;
524         bad_conn_close = true;
525         iob.set_watches(OUT_EVENTS);
526         return true;
527     }
528
529     // Check dependency type is valid:
530     int dep_type_int = rbuf[1];
531     if (! contains({(int)dependency_type::MILESTONE, (int)dependency_type::REGULAR,
532             (int)dependency_type::WAITS_FOR}, dep_type_int)) {
533         char badreqRep[] = { DINIT_RP_BADREQ };
534         if (! queue_packet(badreqRep, 1)) return false;
535         bad_conn_close = true;
536         iob.set_watches(OUT_EVENTS);
537     }
538     dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
539
540     // Remove dependency:
541     from_service->rm_dep(to_service, dep_type);
542     services->process_queues();
543
544     char ack_rep[] = { DINIT_RP_ACK };
545     if (! queue_packet(ack_rep, 1)) return false;
546     rbuf.consume(pkt_size);
547     chklen = 0;
548     return true;
549 }
550
551 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
552 {
553     // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
554     // we find a gap in the handle values.
555     handle_t candidate = 0;
556     for (auto p : key_service_map) {
557         if (p.first == candidate) candidate++;
558         else break;
559     }
560
561     bool is_unique = (service_key_map.find(record) == service_key_map.end());
562
563     // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
564     // must undo any previous actions:
565     if (is_unique) {
566         record->add_listener(this);
567     }
568     
569     try {
570         key_service_map[candidate] = record;
571         service_key_map.insert(std::make_pair(record, candidate));
572     }
573     catch (...) {
574         if (is_unique) {
575             record->remove_listener(this);
576         }
577
578         key_service_map.erase(candidate);
579     }
580     
581     return candidate;
582 }
583
584 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
585 {
586     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
587     bool was_empty = outbuf.empty();
588
589     // If the queue is empty, we can try to write the packet out now rather than queueing it.
590     // If the write is unsuccessful or partial, we queue the remainder.
591     if (was_empty) {
592         int wr = bp_sys::write(iob.get_watched_fd(), pkt, size);
593         if (wr == -1) {
594             if (errno == EPIPE) {
595                 return false;
596             }
597             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
598                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
599                 return false;
600             }
601             // EAGAIN etc: fall through to below
602         }
603         else {
604             if ((unsigned)wr == size) {
605                 // Ok, all written.
606                 iob.set_watches(in_flag);
607                 return true;
608             }
609             pkt += wr;
610             size -= wr;
611         }
612     }
613     
614     // Create a vector out of the (remaining part of the) packet:
615     try {
616         outbuf.emplace_back(pkt, pkt + size);
617         iob.set_watches(in_flag | OUT_EVENTS);
618         return true;
619     }
620     catch (std::bad_alloc &baexc) {
621         // Mark the connection bad, and stop reading further requests
622         bad_conn_close = true;
623         oom_close = true;
624         if (was_empty) {
625             // We can't send out-of-memory response as we already wrote as much as we
626             // could above. Neither can we later send the response since we have currently
627             // sent an incomplete packet. All we can do is close the connection.
628             return false;
629         }
630         else {
631             iob.set_watches(OUT_EVENTS);
632             return true;
633         }
634     }
635 }
636
637 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
638 // make them extraordinary difficult to combine into a single method.
639 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
640 {
641     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
642     bool was_empty = outbuf.empty();
643     
644     if (was_empty) {
645         outpkt_index = 0;
646         // We can try sending the packet immediately:
647         int wr = bp_sys::write(iob.get_watched_fd(), pkt.data(), pkt.size());
648         if (wr == -1) {
649             if (errno == EPIPE) {
650                 return false;
651             }
652             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
653                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
654                 return false;
655             }
656             // EAGAIN etc: fall through to below
657         }
658         else {
659             if ((unsigned)wr == pkt.size()) {
660                 // Ok, all written.
661                 iob.set_watches(in_flag);
662                 return true;
663             }
664             outpkt_index = wr;
665         }
666     }
667     
668     try {
669         outbuf.emplace_back(pkt);
670         iob.set_watches(in_flag | OUT_EVENTS);
671         return true;
672     }
673     catch (std::bad_alloc &baexc) {
674         // Mark the connection bad, and stop reading further requests
675         bad_conn_close = true;
676         oom_close = true;
677         if (was_empty) {
678             // We can't send out-of-memory response as we already wrote as much as we
679             // could above. Neither can we later send the response since we have currently
680             // sent an incomplete packet. All we can do is close the connection.
681             return false;
682         }
683         else {
684             iob.set_watches(OUT_EVENTS);
685             return true;
686         }
687     }
688 }
689
690 bool control_conn_t::rollback_complete() noexcept
691 {
692     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
693     return queue_packet(ackBuf, 2);
694 }
695
696 bool control_conn_t::data_ready() noexcept
697 {
698     int fd = iob.get_watched_fd();
699     
700     int r = rbuf.fill(fd);
701     
702     // Note file descriptor is non-blocking
703     if (r == -1) {
704         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
705             log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
706             return true;
707         }
708         return false;
709     }
710     
711     if (r == 0) {
712         return true;
713     }
714     
715     // complete packet?
716     if (rbuf.get_length() >= chklen) {
717         try {
718             return !process_packet();
719         }
720         catch (std::bad_alloc &baexc) {
721             do_oom_close();
722             return false;
723         }
724     }
725     else if (rbuf.get_length() == 1024) {
726         // Too big packet
727         log(loglevel_t::WARN, "Received too-large control package; dropping connection");
728         bad_conn_close = true;
729         iob.set_watches(OUT_EVENTS);
730     }
731     else {
732         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
733         iob.set_watches(IN_EVENTS | out_flags);
734     }
735     
736     return false;
737 }
738
739 bool control_conn_t::send_data() noexcept
740 {
741     if (outbuf.empty() && bad_conn_close) {
742         if (oom_close) {
743             // Send oom response
744             char oomBuf[] = { DINIT_RP_OOM };
745             bp_sys::write(iob.get_watched_fd(), oomBuf, 1);
746         }
747         return true;
748     }
749     
750     vector<char> & pkt = outbuf.front();
751     char *data = pkt.data();
752     int written = bp_sys::write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
753     if (written == -1) {
754         if (errno == EPIPE) {
755             // read end closed
756             return true;
757         }
758         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
759             // spurious readiness notification?
760         }
761         else {
762             log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
763             return true;
764         }
765         return false;
766     }
767
768     outpkt_index += written;
769     if (outpkt_index == pkt.size()) {
770         // We've finished this packet, move on to the next:
771         outbuf.pop_front();
772         outpkt_index = 0;
773         if (outbuf.empty() && ! oom_close) {
774             if (! bad_conn_close) {
775                 iob.set_watches(IN_EVENTS);
776             }
777             else {
778                 return true;
779             }
780         }
781     }
782     
783     return false;
784 }
785
786 control_conn_t::~control_conn_t() noexcept
787 {
788     bp_sys::close(iob.get_watched_fd());
789     iob.deregister(loop);
790     
791     // Clear service listeners
792     for (auto p : service_key_map) {
793         p.first->remove_listener(this);
794     }
795     
796     active_control_conns--;
797 }