Add initial control protocol test.
[oweals/dinit.git] / src / control.cc
1 #include <algorithm>
2 #include <unordered_set>
3
4 #include "control.h"
5 #include "service.h"
6
7 // Server-side control protocol implementation. This implements the functionality that allows
8 // clients (such as dinitctl) to query service state and issue commands to control services.
9
10 namespace {
11     constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
12     constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
13
14     // Control protocol minimum compatible version and current version:
15     constexpr uint16_t min_compat_version = 1;
16     constexpr uint16_t cp_version = 1;
17 }
18
19 bool control_conn_t::process_packet()
20 {
21     using std::string;
22     
23     // Note that where we call queue_packet, we must generally check the return value. If it
24     // returns false it has either deleted the connection or marked it for deletion; we
25     // shouldn't touch instance members after that point.
26
27     int pktType = rbuf[0];
28     if (pktType == DINIT_CP_QUERYVERSION) {
29         // Responds with:
30         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) actual version
31         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
32         memcpy(replyBuf + 1, &min_compat_version, 2);
33         memcpy(replyBuf + 3, &cp_version, 2);
34         if (! queue_packet(replyBuf, sizeof(replyBuf))) return false;
35         rbuf.consume(1);
36         return true;
37     }
38     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
39         return process_find_load(pktType);
40     }
41     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
42             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
43         return process_start_stop(pktType);
44     }
45     if (pktType == DINIT_CP_UNPINSERVICE) {
46         return process_unpin_service();
47     }
48     if (pktType == DINIT_CP_UNLOADSERVICE) {
49         return process_unload_service();
50     }
51     if (pktType == DINIT_CP_SHUTDOWN) {
52         // Shutdown/reboot
53         if (rbuf.get_length() < 2) {
54             chklen = 2;
55             return true;
56         }
57         
58         auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
59         
60         services->stop_all_services(sd_type);
61         char ackBuf[] = { DINIT_RP_ACK };
62         if (! queue_packet(ackBuf, 1)) return false;
63         
64         // Clear the packet from the buffer
65         rbuf.consume(2);
66         chklen = 0;
67         return true;
68     }
69     if (pktType == DINIT_CP_LISTSERVICES) {
70         return list_services();
71     }
72     if (pktType == DINIT_CP_ADD_DEP) {
73         return add_service_dep();
74     }
75     if (pktType == DINIT_CP_REM_DEP) {
76         return rm_service_dep();
77     }
78
79     // Unrecognized: give error response
80     char outbuf[] = { DINIT_RP_BADREQ };
81     if (! queue_packet(outbuf, 1)) return false;
82     bad_conn_close = true;
83     iob.set_watches(OUT_EVENTS);
84     return true;
85 }
86
87 bool control_conn_t::process_find_load(int pktType)
88 {
89     using std::string;
90     
91     constexpr int pkt_size = 4;
92     
93     if (rbuf.get_length() < pkt_size) {
94         chklen = pkt_size;
95         return true;
96     }
97     
98     uint16_t svcSize;
99     rbuf.extract((char *)&svcSize, 1, 2);
100     chklen = svcSize + 3; // packet type + (2 byte) length + service name
101     if (svcSize <= 0 || chklen > 1024) {
102         // Queue error response / mark connection bad
103         char badreqRep[] = { DINIT_RP_BADREQ };
104         if (! queue_packet(badreqRep, 1)) return false;
105         bad_conn_close = true;
106         iob.set_watches(OUT_EVENTS);
107         return true;
108     }
109     
110     if (rbuf.get_length() < chklen) {
111         // packet not complete yet; read more
112         return true;
113     }
114     
115     service_record * record = nullptr;
116     
117     string serviceName = rbuf.extract_string(3, svcSize);
118     
119     if (pktType == DINIT_CP_LOADSERVICE) {
120         // LOADSERVICE
121         try {
122             record = services->load_service(serviceName.c_str());
123         }
124         catch (service_load_exc &slexc) {
125             log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
126         }
127     }
128     else {
129         // FINDSERVICE
130         record = services->find_service(serviceName.c_str());
131     }
132     
133     if (record != nullptr) {
134         // Allocate a service handle
135         handle_t handle = allocate_service_handle(record);
136         std::vector<char> rp_buf;
137         rp_buf.reserve(7);
138         rp_buf.push_back(DINIT_RP_SERVICERECORD);
139         rp_buf.push_back(static_cast<char>(record->get_state()));
140         for (int i = 0; i < (int) sizeof(handle); i++) {
141             rp_buf.push_back(*(((char *) &handle) + i));
142         }
143         rp_buf.push_back(static_cast<char>(record->get_target_state()));
144         if (! queue_packet(std::move(rp_buf))) return false;
145     }
146     else {
147         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
148         if (! queue_packet(std::move(rp_buf))) return false;
149     }
150     
151     // Clear the packet from the buffer
152     rbuf.consume(chklen);
153     chklen = 0;
154     return true;
155 }
156
157 bool control_conn_t::process_start_stop(int pktType)
158 {
159     using std::string;
160     
161     constexpr int pkt_size = 2 + sizeof(handle_t);
162     
163     if (rbuf.get_length() < pkt_size) {
164         chklen = pkt_size;
165         return true;
166     }
167     
168     // 1 byte: packet type
169     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
170     // 4 bytes: service handle
171     
172     bool do_pin = (rbuf[1] == 1);
173     handle_t handle;
174     rbuf.extract((char *) &handle, 2, sizeof(handle));
175     
176     service_record *service = find_service_for_key(handle);
177     if (service == nullptr) {
178         // Service handle is bad
179         char badreqRep[] = { DINIT_RP_BADREQ };
180         if (! queue_packet(badreqRep, 1)) return false;
181         bad_conn_close = true;
182         iob.set_watches(OUT_EVENTS);
183         return true;
184     }
185     else {
186         bool already_there = false;
187         
188         switch (pktType) {
189         case DINIT_CP_STARTSERVICE:
190             // start service, mark as required
191             if (do_pin) service->pin_start();
192             service->start();
193             services->process_queues();
194             already_there = service->get_state() == service_state_t::STARTED;
195             break;
196         case DINIT_CP_STOPSERVICE:
197             // force service to stop
198             if (do_pin) service->pin_stop();
199             service->stop(true);
200             service->forced_stop();
201             services->process_queues();
202             already_there = service->get_state() == service_state_t::STOPPED;
203             break;
204         case DINIT_CP_WAKESERVICE:
205             // re-start a stopped service (do not mark as required)
206             if (do_pin) service->pin_start();
207             service->start(false);
208             services->process_queues();
209             already_there = service->get_state() == service_state_t::STARTED;
210             break;
211         case DINIT_CP_RELEASESERVICE:
212             // remove required mark, stop if not required by dependents
213             if (do_pin) service->pin_stop();
214             service->stop(false);
215             services->process_queues();
216             already_there = service->get_state() == service_state_t::STOPPED;
217             break;
218         }
219         
220         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
221         
222         if (! queue_packet(ack_buf, 1)) return false;
223     }
224     
225     // Clear the packet from the buffer
226     rbuf.consume(pkt_size);
227     chklen = 0;
228     return true;
229 }
230
231 bool control_conn_t::process_unpin_service()
232 {
233     using std::string;
234     
235     constexpr int pkt_size = 1 + sizeof(handle_t);
236     
237     if (rbuf.get_length() < pkt_size) {
238         chklen = pkt_size;
239         return true;
240     }
241     
242     // 1 byte: packet type
243     // 4 bytes: service handle
244     
245     handle_t handle;
246     rbuf.extract((char *) &handle, 1, sizeof(handle));
247     
248     service_record *service = find_service_for_key(handle);
249     if (service == nullptr) {
250         // Service handle is bad
251         char badreqRep[] = { DINIT_RP_BADREQ };
252         if (! queue_packet(badreqRep, 1)) return false;
253         bad_conn_close = true;
254         iob.set_watches(OUT_EVENTS);
255         return true;
256     }
257
258     service->unpin();
259     services->process_queues();
260     char ack_buf[] = { (char) DINIT_RP_ACK };
261     if (! queue_packet(ack_buf, 1)) return false;
262     
263     // Clear the packet from the buffer
264     rbuf.consume(pkt_size);
265     chklen = 0;
266     return true;
267 }
268
269 bool control_conn_t::process_unload_service()
270 {
271     using std::string;
272
273     constexpr int pkt_size = 1 + sizeof(handle_t);
274
275     if (rbuf.get_length() < pkt_size) {
276         chklen = pkt_size;
277         return true;
278     }
279
280     // 1 byte: packet type
281     // 4 bytes: service handle
282
283     handle_t handle;
284     rbuf.extract((char *) &handle, 1, sizeof(handle));
285
286     service_record *service = find_service_for_key(handle);
287     if (service == nullptr) {
288         // Service handle is bad
289         char badreq_rep[] = { DINIT_RP_BADREQ };
290         if (! queue_packet(badreq_rep, 1)) return false;
291         bad_conn_close = true;
292         iob.set_watches(OUT_EVENTS);
293         return true;
294     }
295
296     if (! service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
297         // Cannot unload: has other references
298         char nak_rep[] = { DINIT_RP_NAK };
299         if (! queue_packet(nak_rep, 1)) return false;
300     }
301     else {
302         // unload
303         service->prepare_for_unload();
304         services->remove_service(service);
305         delete service;
306
307         // drop handle
308         service_key_map.erase(service);
309         key_service_map.erase(handle);
310
311         // send ack
312         char ack_buf[] = { (char) DINIT_RP_ACK };
313         if (! queue_packet(ack_buf, 1)) return false;
314     }
315
316     // Clear the packet from the buffer
317     rbuf.consume(pkt_size);
318     chklen = 0;
319     return true;
320 }
321
322 bool control_conn_t::list_services()
323 {
324     rbuf.consume(1); // clear request packet
325     chklen = 0;
326     
327     try {
328         auto slist = services->list_services();
329         for (auto sptr : slist) {
330             std::vector<char> pkt_buf;
331             
332             int hdrsize = 8 + std::max(sizeof(int), sizeof(pid_t));
333
334             const std::string &name = sptr->get_name();
335             int nameLen = std::min((size_t)256, name.length());
336             pkt_buf.resize(hdrsize + nameLen);
337             
338             pkt_buf[0] = DINIT_RP_SVCINFO;
339             pkt_buf[1] = nameLen;
340             pkt_buf[2] = static_cast<char>(sptr->get_state());
341             pkt_buf[3] = static_cast<char>(sptr->get_target_state());
342             
343             char b0 = sptr->is_waiting_for_console() ? 1 : 0;
344             b0 |= sptr->has_console() ? 2 : 0;
345             b0 |= sptr->was_start_skipped() ? 4 : 0;
346             pkt_buf[4] = b0;
347             pkt_buf[5] = static_cast<char>(sptr->get_stop_reason());
348
349             pkt_buf[6] = 0; // reserved
350             pkt_buf[7] = 0;
351             
352             // Next: either the exit status, or the process ID
353             if (sptr->get_state() != service_state_t::STOPPED) {
354                 pid_t proc_pid = sptr->get_pid();
355                 memcpy(pkt_buf.data() + 8, &proc_pid, sizeof(proc_pid));
356             }
357             else {
358                 int exit_status = sptr->get_exit_status();
359                 memcpy(pkt_buf.data() + 8, &exit_status, sizeof(exit_status));
360             }
361
362             for (int i = 0; i < nameLen; i++) {
363                 pkt_buf[hdrsize+i] = name[i];
364             }
365             
366             if (! queue_packet(std::move(pkt_buf))) return false;
367         }
368         
369         char ack_buf[] = { (char) DINIT_RP_LISTDONE };
370         if (! queue_packet(ack_buf, 1)) return false;
371         
372         return true;
373     }
374     catch (std::bad_alloc &exc)
375     {
376         do_oom_close();
377         return true;
378     }
379 }
380
381 // check for value in a set
382 static inline bool contains(const int (&v)[3], int i)
383 {
384     return std::find(std::begin(v), std::end(v), i) != std::end(v);
385 }
386
387 bool control_conn_t::add_service_dep()
388 {
389     // 1 byte packet type
390     // 1 byte dependency type
391     // handle: "from"
392     // handle: "to"
393
394     constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
395
396     if (rbuf.get_length() < pkt_size) {
397         chklen = pkt_size;
398         return true;
399     }
400
401     handle_t from_handle;
402     handle_t to_handle;
403     rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
404     rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
405
406     service_record *from_service = find_service_for_key(from_handle);
407     service_record *to_service = find_service_for_key(to_handle);
408     if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
409         // Service handle is bad
410         char badreq_rep[] = { DINIT_RP_BADREQ };
411         if (! queue_packet(badreq_rep, 1)) return false;
412         bad_conn_close = true;
413         iob.set_watches(OUT_EVENTS);
414         return true;
415     }
416
417     // Check dependency type is valid:
418     int dep_type_int = rbuf[1];
419     if (! contains({(int)dependency_type::MILESTONE, (int)dependency_type::REGULAR,
420             (int)dependency_type::WAITS_FOR}, dep_type_int)) {
421         char badreqRep[] = { DINIT_RP_BADREQ };
422         if (! queue_packet(badreqRep, 1)) return false;
423         bad_conn_close = true;
424         iob.set_watches(OUT_EVENTS);
425     }
426     dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
427
428     // Check current service states are valid for given dep type
429     if (dep_type == dependency_type::REGULAR) {
430         if (from_service->get_state() != service_state_t::STOPPED &&
431                 to_service->get_state() != service_state_t::STARTED) {
432             // Cannot create dependency now since it would be contradicted:
433             char nak_rep[] = { DINIT_RP_NAK };
434             if (! queue_packet(nak_rep, 1)) return false;
435             rbuf.consume(pkt_size);
436             chklen = 0;
437             return true;
438         }
439     }
440
441     // Check for creation of circular dependency chain
442     std::unordered_set<service_record *> dep_marks;
443     std::vector<service_record *> dep_queue;
444     dep_queue.push_back(to_service);
445     while (! dep_queue.empty()) {
446         service_record * sr = dep_queue.back();
447         dep_queue.pop_back();
448         // iterate deps; if dep == from, abort; otherwise add to set/queue
449         // (only add to queue if not already in set)
450         for (auto &dep : sr->get_dependencies()) {
451             service_record * dep_to = dep.get_to();
452             if (dep_to == from_service) {
453                 // fail, circular dependency!
454                 char nak_rep[] = { DINIT_RP_NAK };
455                 if (! queue_packet(nak_rep, 1)) return false;
456                 rbuf.consume(pkt_size);
457                 chklen = 0;
458                 return true;
459             }
460             if (dep_marks.insert(dep_to).second) {
461                 dep_queue.push_back(dep_to);
462             }
463         }
464     }
465     dep_marks.clear();
466     dep_queue.clear();
467
468     // Prevent creation of duplicate dependency:
469     for (auto &dep : from_service->get_dependencies()) {
470         service_record * dep_to = dep.get_to();
471         if (dep_to == to_service && dep.dep_type == dep_type) {
472             // Dependency already exists: return success
473             char ack_rep[] = { DINIT_RP_ACK };
474             if (! queue_packet(ack_rep, 1)) return false;
475             rbuf.consume(pkt_size);
476             chklen = 0;
477             return true;
478         }
479     }
480
481     // Create dependency:
482     from_service->add_dep(to_service, dep_type);
483     services->process_queues();
484
485     char ack_rep[] = { DINIT_RP_ACK };
486     if (! queue_packet(ack_rep, 1)) return false;
487     rbuf.consume(pkt_size);
488     chklen = 0;
489     return true;
490 }
491
492 bool control_conn_t::rm_service_dep()
493 {
494     // 1 byte packet type
495     // 1 byte dependency type
496     // handle: "from"
497     // handle: "to"
498
499     constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
500
501     if (rbuf.get_length() < pkt_size) {
502         chklen = pkt_size;
503         return true;
504     }
505
506     handle_t from_handle;
507     handle_t to_handle;
508     rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
509     rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
510
511     service_record *from_service = find_service_for_key(from_handle);
512     service_record *to_service = find_service_for_key(to_handle);
513     if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
514         // Service handle is bad
515         char badreq_rep[] = { DINIT_RP_BADREQ };
516         if (! queue_packet(badreq_rep, 1)) return false;
517         bad_conn_close = true;
518         iob.set_watches(OUT_EVENTS);
519         return true;
520     }
521
522     // Check dependency type is valid:
523     int dep_type_int = rbuf[1];
524     if (! contains({(int)dependency_type::MILESTONE, (int)dependency_type::REGULAR,
525             (int)dependency_type::WAITS_FOR}, dep_type_int)) {
526         char badreqRep[] = { DINIT_RP_BADREQ };
527         if (! queue_packet(badreqRep, 1)) return false;
528         bad_conn_close = true;
529         iob.set_watches(OUT_EVENTS);
530     }
531     dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
532
533     // Remove dependency:
534     from_service->rm_dep(to_service, dep_type);
535     services->process_queues();
536
537     char ack_rep[] = { DINIT_RP_ACK };
538     if (! queue_packet(ack_rep, 1)) return false;
539     rbuf.consume(pkt_size);
540     chklen = 0;
541     return true;
542 }
543
544 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
545 {
546     // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
547     // we find a gap in the handle values.
548     handle_t candidate = 0;
549     for (auto p : key_service_map) {
550         if (p.first == candidate) candidate++;
551         else break;
552     }
553
554     bool is_unique = (service_key_map.find(record) == service_key_map.end());
555
556     // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
557     // must undo any previous actions:
558     if (is_unique) {
559         record->add_listener(this);
560     }
561     
562     try {
563         key_service_map[candidate] = record;
564         service_key_map.insert(std::make_pair(record, candidate));
565     }
566     catch (...) {
567         if (is_unique) {
568             record->remove_listener(this);
569         }
570
571         key_service_map.erase(candidate);
572     }
573     
574     return candidate;
575 }
576
577 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
578 {
579     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
580     bool was_empty = outbuf.empty();
581
582     // If the queue is empty, we can try to write the packet out now rather than queueing it.
583     // If the write is unsuccessful or partial, we queue the remainder.
584     if (was_empty) {
585         int wr = bp_sys::write(iob.get_watched_fd(), pkt, size);
586         if (wr == -1) {
587             if (errno == EPIPE) {
588                 return false;
589             }
590             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
591                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
592                 return false;
593             }
594             // EAGAIN etc: fall through to below
595         }
596         else {
597             if ((unsigned)wr == size) {
598                 // Ok, all written.
599                 iob.set_watches(in_flag);
600                 return true;
601             }
602             pkt += wr;
603             size -= wr;
604         }
605     }
606     
607     // Create a vector out of the (remaining part of the) packet:
608     try {
609         outbuf.emplace_back(pkt, pkt + size);
610         iob.set_watches(in_flag | OUT_EVENTS);
611         return true;
612     }
613     catch (std::bad_alloc &baexc) {
614         // Mark the connection bad, and stop reading further requests
615         bad_conn_close = true;
616         oom_close = true;
617         if (was_empty) {
618             // We can't send out-of-memory response as we already wrote as much as we
619             // could above. Neither can we later send the response since we have currently
620             // sent an incomplete packet. All we can do is close the connection.
621             return false;
622         }
623         else {
624             iob.set_watches(OUT_EVENTS);
625             return true;
626         }
627     }
628 }
629
630 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
631 // make them extraordinary difficult to combine into a single method.
632 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
633 {
634     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
635     bool was_empty = outbuf.empty();
636     
637     if (was_empty) {
638         outpkt_index = 0;
639         // We can try sending the packet immediately:
640         int wr = bp_sys::write(iob.get_watched_fd(), pkt.data(), pkt.size());
641         if (wr == -1) {
642             if (errno == EPIPE) {
643                 return false;
644             }
645             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
646                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
647                 return false;
648             }
649             // EAGAIN etc: fall through to below
650         }
651         else {
652             if ((unsigned)wr == pkt.size()) {
653                 // Ok, all written.
654                 iob.set_watches(in_flag);
655                 return true;
656             }
657             outpkt_index = wr;
658         }
659     }
660     
661     try {
662         outbuf.emplace_back(pkt);
663         iob.set_watches(in_flag | OUT_EVENTS);
664         return true;
665     }
666     catch (std::bad_alloc &baexc) {
667         // Mark the connection bad, and stop reading further requests
668         bad_conn_close = true;
669         oom_close = true;
670         if (was_empty) {
671             // We can't send out-of-memory response as we already wrote as much as we
672             // could above. Neither can we later send the response since we have currently
673             // sent an incomplete packet. All we can do is close the connection.
674             return false;
675         }
676         else {
677             iob.set_watches(OUT_EVENTS);
678             return true;
679         }
680     }
681 }
682
683 bool control_conn_t::rollback_complete() noexcept
684 {
685     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
686     return queue_packet(ackBuf, 2);
687 }
688
689 bool control_conn_t::data_ready() noexcept
690 {
691     int fd = iob.get_watched_fd();
692     
693     int r = rbuf.fill(fd);
694     
695     // Note file descriptor is non-blocking
696     if (r == -1) {
697         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
698             log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
699             return true;
700         }
701         return false;
702     }
703     
704     if (r == 0) {
705         return true;
706     }
707     
708     // complete packet?
709     if (rbuf.get_length() >= chklen) {
710         try {
711             return !process_packet();
712         }
713         catch (std::bad_alloc &baexc) {
714             do_oom_close();
715             return false;
716         }
717     }
718     else if (rbuf.get_length() == 1024) {
719         // Too big packet
720         log(loglevel_t::WARN, "Received too-large control package; dropping connection");
721         bad_conn_close = true;
722         iob.set_watches(OUT_EVENTS);
723     }
724     else {
725         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
726         iob.set_watches(IN_EVENTS | out_flags);
727     }
728     
729     return false;
730 }
731
732 bool control_conn_t::send_data() noexcept
733 {
734     if (outbuf.empty() && bad_conn_close) {
735         if (oom_close) {
736             // Send oom response
737             char oomBuf[] = { DINIT_RP_OOM };
738             bp_sys::write(iob.get_watched_fd(), oomBuf, 1);
739         }
740         return true;
741     }
742     
743     vector<char> & pkt = outbuf.front();
744     char *data = pkt.data();
745     int written = bp_sys::write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
746     if (written == -1) {
747         if (errno == EPIPE) {
748             // read end closed
749             return true;
750         }
751         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
752             // spurious readiness notification?
753         }
754         else {
755             log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
756             return true;
757         }
758         return false;
759     }
760
761     outpkt_index += written;
762     if (outpkt_index == pkt.size()) {
763         // We've finished this packet, move on to the next:
764         outbuf.pop_front();
765         outpkt_index = 0;
766         if (outbuf.empty() && ! oom_close) {
767             if (! bad_conn_close) {
768                 iob.set_watches(IN_EVENTS);
769             }
770             else {
771                 return true;
772             }
773         }
774     }
775     
776     return false;
777 }
778
779 control_conn_t::~control_conn_t() noexcept
780 {
781     close(iob.get_watched_fd());
782     iob.deregister(loop);
783     
784     // Clear service listeners
785     for (auto p : service_key_map) {
786         p.first->remove_listener(this);
787     }
788     
789     active_control_conns--;
790 }