query_name: always return with NAK if service handle invalid
[oweals/dinit.git] / src / control.cc
1 #include <algorithm>
2 #include <unordered_set>
3 #include <climits>
4
5 #include "control.h"
6 #include "service.h"
7
8 // Server-side control protocol implementation. This implements the functionality that allows
9 // clients (such as dinitctl) to query service state and issue commands to control services.
10
11 namespace {
12     constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
13     constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
14
15     // Control protocol minimum compatible version and current version:
16     constexpr uint16_t min_compat_version = 1;
17     constexpr uint16_t cp_version = 1;
18
19     // check for value in a set
20     template <typename T, int N, typename U>
21     inline bool contains(const T (&v)[N], U i)
22     {
23         return std::find_if(std::begin(v), std::end(v),
24                 [=](T p){ return i == static_cast<U>(p); }) != std::end(v);
25     }
26 }
27
28 bool control_conn_t::process_packet()
29 {
30     using std::string;
31     
32     // Note that where we call queue_packet, we must generally check the return value. If it
33     // returns false it has either deleted the connection or marked it for deletion; we
34     // shouldn't touch instance members after that point.
35
36     int pktType = rbuf[0];
37     if (pktType == DINIT_CP_QUERYVERSION) {
38         // Responds with:
39         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) actual version
40         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
41         memcpy(replyBuf + 1, &min_compat_version, 2);
42         memcpy(replyBuf + 3, &cp_version, 2);
43         if (! queue_packet(replyBuf, sizeof(replyBuf))) return false;
44         rbuf.consume(1);
45         return true;
46     }
47     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
48         return process_find_load(pktType);
49     }
50     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
51             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
52         return process_start_stop(pktType);
53     }
54     if (pktType == DINIT_CP_UNPINSERVICE) {
55         return process_unpin_service();
56     }
57     if (pktType == DINIT_CP_UNLOADSERVICE) {
58         return process_unload_service();
59     }
60     if (pktType == DINIT_CP_RELOADSERVICE) {
61         return process_reload_service();
62     }
63     if (pktType == DINIT_CP_SHUTDOWN) {
64         // Shutdown/reboot
65         if (rbuf.get_length() < 2) {
66             chklen = 2;
67             return true;
68         }
69         
70         if (contains({shutdown_type_t::REMAIN, shutdown_type_t::HALT,
71                 shutdown_type_t::POWEROFF, shutdown_type_t::REBOOT}, rbuf[1])) {
72             auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
73
74             services->stop_all_services(sd_type);
75             char ackBuf[] = { DINIT_RP_ACK };
76             if (! queue_packet(ackBuf, 1)) return false;
77
78             // Clear the packet from the buffer
79             rbuf.consume(2);
80             chklen = 0;
81             return true;
82         }
83
84         // (otherwise fall through to below).
85     }
86     if (pktType == DINIT_CP_LISTSERVICES) {
87         return list_services();
88     }
89     if (pktType == DINIT_CP_ADD_DEP) {
90         return add_service_dep();
91     }
92     if (pktType == DINIT_CP_REM_DEP) {
93         return rm_service_dep();
94     }
95     if (pktType == DINIT_CP_QUERY_LOAD_MECH) {
96         return query_load_mech();
97     }
98     if (pktType == DINIT_CP_ENABLESERVICE) {
99         return add_service_dep(true);
100     }
101     if (pktType == DINIT_CP_QUERYSERVICENAME) {
102         return process_query_name();
103     }
104
105     // Unrecognized: give error response
106     char outbuf[] = { DINIT_RP_BADREQ };
107     if (! queue_packet(outbuf, 1)) return false;
108     bad_conn_close = true;
109     iob.set_watches(OUT_EVENTS);
110     return true;
111 }
112
113 bool control_conn_t::process_find_load(int pktType)
114 {
115     using std::string;
116     
117     constexpr int pkt_size = 4;
118     
119     if (rbuf.get_length() < pkt_size) {
120         chklen = pkt_size;
121         return true;
122     }
123     
124     uint16_t svcSize;
125     rbuf.extract((char *)&svcSize, 1, 2);
126     if (svcSize <= 0 || svcSize > (1024 - 3)) {
127         // Queue error response / mark connection bad
128         char badreqRep[] = { DINIT_RP_BADREQ };
129         if (! queue_packet(badreqRep, 1)) return false;
130         bad_conn_close = true;
131         iob.set_watches(OUT_EVENTS);
132         return true;
133     }
134     chklen = svcSize + 3; // packet type + (2 byte) length + service name
135     
136     if (rbuf.get_length() < chklen) {
137         // packet not complete yet; read more
138         return true;
139     }
140     
141     service_record * record = nullptr;
142     
143     string serviceName = rbuf.extract_string(3, svcSize);
144     
145     if (pktType == DINIT_CP_LOADSERVICE) {
146         // LOADSERVICE
147         try {
148             record = services->load_service(serviceName.c_str());
149         }
150         catch (service_load_exc &slexc) {
151             log(loglevel_t::ERROR, "Could not load service ", slexc.service_name, ": ",
152                     slexc.exc_description);
153         }
154     }
155     else {
156         // FINDSERVICE
157         record = services->find_service(serviceName.c_str());
158     }
159     
160     if (record != nullptr) {
161         // Allocate a service handle
162         handle_t handle = allocate_service_handle(record);
163         std::vector<char> rp_buf;
164         rp_buf.reserve(7);
165         rp_buf.push_back(DINIT_RP_SERVICERECORD);
166         rp_buf.push_back(static_cast<char>(record->get_state()));
167         for (int i = 0; i < (int) sizeof(handle); i++) {
168             rp_buf.push_back(*(((char *) &handle) + i));
169         }
170         rp_buf.push_back(static_cast<char>(record->get_target_state()));
171         if (! queue_packet(std::move(rp_buf))) return false;
172     }
173     else {
174         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
175         if (! queue_packet(std::move(rp_buf))) return false;
176     }
177     
178     // Clear the packet from the buffer
179     rbuf.consume(chklen);
180     chklen = 0;
181     return true;
182 }
183
184 bool control_conn_t::check_dependents(service_record *service, bool &had_dependents)
185 {
186     std::vector<char> reply_pkt;
187     size_t num_depts = 0;
188
189     for (service_dep *dep : service->get_dependents()) {
190         if (dep->dep_type == dependency_type::REGULAR && dep->holding_acq) {
191             num_depts++;
192             // find or allocate a service handle
193             handle_t dept_handle = allocate_service_handle(dep->get_from());
194             if (reply_pkt.empty()) {
195                 // packet type, size
196                 reply_pkt.reserve(1 + sizeof(size_t) + sizeof(handle_t));
197                 reply_pkt.resize(1 + sizeof(size_t));
198                 reply_pkt[0] = DINIT_RP_DEPENDENTS;
199             }
200             auto old_size = reply_pkt.size();
201             reply_pkt.resize(old_size + sizeof(handle_t));
202             memcpy(reply_pkt.data() + old_size, &dept_handle, sizeof(dept_handle));
203         }
204     }
205
206     if (num_depts != 0) {
207         // There are affected dependents
208         had_dependents = true;
209         memcpy(reply_pkt.data() + 1, &num_depts, sizeof(num_depts));
210         return queue_packet(std::move(reply_pkt));
211     }
212
213     had_dependents = false;
214     return true;
215 }
216
217 bool control_conn_t::process_start_stop(int pktType)
218 {
219     using std::string;
220     
221     constexpr int pkt_size = 2 + sizeof(handle_t);
222     
223     if (rbuf.get_length() < pkt_size) {
224         chklen = pkt_size;
225         return true;
226     }
227     
228     // 1 byte: packet type
229     // 1 byte: flags eg. pin in requested state (0 = no pin, 1 = pin)
230     // 4 bytes: service handle
231     
232     bool do_pin = ((rbuf[1] & 1) == 1);
233     handle_t handle;
234     rbuf.extract((char *) &handle, 2, sizeof(handle));
235     
236     service_record *service = find_service_for_key(handle);
237     if (service == nullptr) {
238         // Service handle is bad
239         char badreqRep[] = { DINIT_RP_BADREQ };
240         if (! queue_packet(badreqRep, 1)) return false;
241         bad_conn_close = true;
242         iob.set_watches(OUT_EVENTS);
243         return true;
244     }
245     else {
246         char ack_buf[1] = { DINIT_RP_ACK };
247         
248         switch (pktType) {
249         case DINIT_CP_STARTSERVICE:
250             // start service, mark as required
251             if (services->is_shutting_down()) {
252                 ack_buf[0] = DINIT_RP_NAK;
253                 break;
254             }
255             if (do_pin) service->pin_start();
256             service->start();
257             services->process_queues();
258             if (service->get_state() == service_state_t::STARTED) ack_buf[0] = DINIT_RP_ALREADYSS;
259             break;
260         case DINIT_CP_STOPSERVICE:
261         {
262             // force service to stop
263             bool do_restart = ((rbuf[1] & 4) == 4);
264             bool gentle = ((rbuf[1] & 2) == 2) || do_restart;  // restart is always "gentle"
265             if (do_restart && services->is_shutting_down()) {
266                 ack_buf[0] = DINIT_RP_NAK;
267                 break;
268             }
269             if (gentle) {
270                 // Check dependents; return appropriate response if any will be affected
271                 bool has_dependents;
272                 if (! check_dependents(service, has_dependents)) {
273                     return false;
274                 }
275                 if (has_dependents) {
276                     // Reply packet has already been sent
277                     goto clear_out;
278                 }
279             }
280             service_state_t wanted_state;
281             if (do_restart) {
282                 if (! service->restart()) {
283                     ack_buf[0] = DINIT_RP_NAK;
284                     break;
285                 }
286                 wanted_state = service_state_t::STARTED;
287             }
288             else {
289                 if (do_pin) service->pin_stop();
290                 service->stop(true);
291                 wanted_state = service_state_t::STOPPED;
292             }
293             service->forced_stop();
294             services->process_queues();
295             if (service->get_state() == wanted_state && !do_restart) ack_buf[0] = DINIT_RP_ALREADYSS;
296             break;
297         }
298         case DINIT_CP_WAKESERVICE:
299         {
300             // re-attach a service to its (started) dependents, causing it to start.
301             if (services->is_shutting_down()) {
302                 ack_buf[0] = DINIT_RP_NAK;
303                 break;
304             }
305             bool found_dpt = false;
306             for (auto dpt : service->get_dependents()) {
307                 auto from = dpt->get_from();
308                 auto from_state = from->get_state();
309                 if (from_state == service_state_t::STARTED || from_state == service_state_t::STARTING) {
310                     found_dpt = true;
311                     if (! dpt->holding_acq) {
312                         dpt->get_from()->start_dep(*dpt);
313                     }
314                 }
315             }
316             if (! found_dpt) {
317                 ack_buf[0] = DINIT_RP_NAK;
318             }
319
320             if (do_pin) service->pin_start();
321             services->process_queues();
322             if (service->get_state() == service_state_t::STARTED) ack_buf[0] = DINIT_RP_ALREADYSS;
323             break;
324         }
325         case DINIT_CP_RELEASESERVICE:
326             // remove required mark, stop if not required by dependents
327             if (do_pin) service->pin_stop();
328             service->stop(false);
329             services->process_queues();
330             if (service->get_state() == service_state_t::STOPPED) ack_buf[0] = DINIT_RP_ALREADYSS;
331             break;
332         }
333         
334         if (! queue_packet(ack_buf, 1)) return false;
335     }
336     
337     clear_out:
338     // Clear the packet from the buffer
339     rbuf.consume(pkt_size);
340     chklen = 0;
341     return true;
342 }
343
344 bool control_conn_t::process_unpin_service()
345 {
346     using std::string;
347     
348     constexpr int pkt_size = 1 + sizeof(handle_t);
349     
350     if (rbuf.get_length() < pkt_size) {
351         chklen = pkt_size;
352         return true;
353     }
354     
355     // 1 byte: packet type
356     // 4 bytes: service handle
357     
358     handle_t handle;
359     rbuf.extract((char *) &handle, 1, sizeof(handle));
360     
361     service_record *service = find_service_for_key(handle);
362     if (service == nullptr) {
363         // Service handle is bad
364         char badreqRep[] = { DINIT_RP_BADREQ };
365         if (! queue_packet(badreqRep, 1)) return false;
366         bad_conn_close = true;
367         iob.set_watches(OUT_EVENTS);
368         return true;
369     }
370
371     service->unpin();
372     services->process_queues();
373     char ack_buf[] = { (char) DINIT_RP_ACK };
374     if (! queue_packet(ack_buf, 1)) return false;
375     
376     // Clear the packet from the buffer
377     rbuf.consume(pkt_size);
378     chklen = 0;
379     return true;
380 }
381
382 bool control_conn_t::process_unload_service()
383 {
384     using std::string;
385
386     constexpr int pkt_size = 1 + sizeof(handle_t);
387
388     if (rbuf.get_length() < pkt_size) {
389         chklen = pkt_size;
390         return true;
391     }
392
393     // 1 byte: packet type
394     // 4 bytes: service handle
395
396     handle_t handle;
397     rbuf.extract((char *) &handle, 1, sizeof(handle));
398
399     service_record *service = find_service_for_key(handle);
400     if (service == nullptr) {
401         // Service handle is bad
402         char badreq_rep[] = { DINIT_RP_BADREQ };
403         if (! queue_packet(badreq_rep, 1)) return false;
404         bad_conn_close = true;
405         iob.set_watches(OUT_EVENTS);
406         return true;
407     }
408
409     if (! service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
410         // Cannot unload: has other references
411         char nak_rep[] = { DINIT_RP_NAK };
412         if (! queue_packet(nak_rep, 1)) return false;
413     }
414     else {
415         // unload
416         service->prepare_for_unload();
417         services->remove_service(service);
418         delete service;
419
420         // drop handle
421         service_key_map.erase(service);
422         key_service_map.erase(handle);
423
424         // send ack
425         char ack_buf[] = { (char) DINIT_RP_ACK };
426         if (! queue_packet(ack_buf, 1)) return false;
427     }
428
429     // Clear the packet from the buffer
430     rbuf.consume(pkt_size);
431     chklen = 0;
432     return true;
433 }
434
435 bool control_conn_t::process_reload_service()
436 {
437     using std::string;
438
439     constexpr int pkt_size = 1 + sizeof(handle_t);
440
441     if (rbuf.get_length() < pkt_size) {
442         chklen = pkt_size;
443         return true;
444     }
445
446     // 1 byte: packet type
447     // 4 bytes: service handle
448
449     handle_t handle;
450     rbuf.extract((char *) &handle, 1, sizeof(handle));
451
452     service_record *service = find_service_for_key(handle);
453     if (service == nullptr) {
454         // Service handle is bad
455         char badreq_rep[] = { DINIT_RP_BADREQ };
456         if (! queue_packet(badreq_rep, 1)) return false;
457         bad_conn_close = true;
458         iob.set_watches(OUT_EVENTS);
459         return true;
460     }
461
462     if (! service->has_lone_ref(false)) {
463         // Cannot unload: has other references
464         char nak_rep[] = { DINIT_RP_NAK };
465         if (! queue_packet(nak_rep, 1)) return false;
466     }
467     else {
468         try {
469             // reload
470             auto *new_service = services->reload_service(service);
471             if (new_service != service) {
472                 service->prepare_for_unload();
473                 services->replace_service(service, new_service);
474                 delete service;
475             }
476             else {
477                 service->remove_listener(this);
478             }
479
480             // drop handle
481             key_service_map.erase(handle);
482             service_key_map.erase(service);
483
484             services->process_queues();
485
486             // send ack
487             char ack_buf[] = { (char) DINIT_RP_ACK };
488             if (! queue_packet(ack_buf, 1)) return false;
489         }
490         catch (service_load_exc &slexc) {
491             log(loglevel_t::ERROR, "Could not reload service ", slexc.service_name, ": ",
492                     slexc.exc_description);
493             char nak_rep[] = { DINIT_RP_NAK };
494             if (! queue_packet(nak_rep, 1)) return false;
495         }
496     }
497
498     // Clear the packet from the buffer
499     rbuf.consume(pkt_size);
500     chklen = 0;
501     return true;
502 }
503
504 bool control_conn_t::list_services()
505 {
506     rbuf.consume(1); // clear request packet
507     chklen = 0;
508     
509     try {
510         auto slist = services->list_services();
511         for (auto sptr : slist) {
512             std::vector<char> pkt_buf;
513             
514             int hdrsize = 8 + std::max(sizeof(int), sizeof(pid_t));
515
516             const std::string &name = sptr->get_name();
517             int nameLen = std::min((size_t)256, name.length());
518             pkt_buf.resize(hdrsize + nameLen);
519             
520             pkt_buf[0] = DINIT_RP_SVCINFO;
521             pkt_buf[1] = nameLen;
522             pkt_buf[2] = static_cast<char>(sptr->get_state());
523             pkt_buf[3] = static_cast<char>(sptr->get_target_state());
524             
525             char b0 = sptr->is_waiting_for_console() ? 1 : 0;
526             b0 |= sptr->has_console() ? 2 : 0;
527             b0 |= sptr->was_start_skipped() ? 4 : 0;
528             pkt_buf[4] = b0;
529             pkt_buf[5] = static_cast<char>(sptr->get_stop_reason());
530
531             pkt_buf[6] = 0; // reserved
532             pkt_buf[7] = 0;
533             
534             // Next: either the exit status, or the process ID
535             if (sptr->get_state() != service_state_t::STOPPED) {
536                 pid_t proc_pid = sptr->get_pid();
537                 memcpy(pkt_buf.data() + 8, &proc_pid, sizeof(proc_pid));
538             }
539             else {
540                 int exit_status = sptr->get_exit_status();
541                 memcpy(pkt_buf.data() + 8, &exit_status, sizeof(exit_status));
542             }
543
544             for (int i = 0; i < nameLen; i++) {
545                 pkt_buf[hdrsize+i] = name[i];
546             }
547             
548             if (! queue_packet(std::move(pkt_buf))) return false;
549         }
550         
551         char ack_buf[] = { (char) DINIT_RP_LISTDONE };
552         if (! queue_packet(ack_buf, 1)) return false;
553         
554         return true;
555     }
556     catch (std::bad_alloc &exc)
557     {
558         do_oom_close();
559         return true;
560     }
561 }
562
563 bool control_conn_t::add_service_dep(bool do_enable)
564 {
565     // 1 byte packet type
566     // 1 byte dependency type
567     // handle: "from"
568     // handle: "to"
569
570     constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
571
572     if (rbuf.get_length() < pkt_size) {
573         chklen = pkt_size;
574         return true;
575     }
576
577     handle_t from_handle;
578     handle_t to_handle;
579     rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
580     rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
581
582     service_record *from_service = find_service_for_key(from_handle);
583     service_record *to_service = find_service_for_key(to_handle);
584     if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
585         // Service handle is bad
586         char badreq_rep[] = { DINIT_RP_BADREQ };
587         if (! queue_packet(badreq_rep, 1)) return false;
588         bad_conn_close = true;
589         iob.set_watches(OUT_EVENTS);
590         return true;
591     }
592
593     // Check dependency type is valid:
594     int dep_type_int = rbuf[1];
595     if (! contains({dependency_type::MILESTONE, dependency_type::REGULAR,
596             dependency_type::WAITS_FOR}, dep_type_int)) {
597         char badreqRep[] = { DINIT_RP_BADREQ };
598         if (! queue_packet(badreqRep, 1)) return false;
599         bad_conn_close = true;
600         iob.set_watches(OUT_EVENTS);
601     }
602     dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
603
604     // Check current service states are valid for given dep type
605     if (dep_type == dependency_type::REGULAR) {
606         if (from_service->get_state() != service_state_t::STOPPED &&
607                 to_service->get_state() != service_state_t::STARTED) {
608             // Cannot create dependency now since it would be contradicted:
609             char nak_rep[] = { DINIT_RP_NAK };
610             if (! queue_packet(nak_rep, 1)) return false;
611             rbuf.consume(pkt_size);
612             chklen = 0;
613             return true;
614         }
615     }
616
617     // Check for creation of circular dependency chain
618     std::unordered_set<service_record *> dep_marks;
619     std::vector<service_record *> dep_queue;
620     dep_queue.push_back(to_service);
621     while (! dep_queue.empty()) {
622         service_record * sr = dep_queue.back();
623         dep_queue.pop_back();
624         // iterate deps; if dep == from, abort; otherwise add to set/queue
625         // (only add to queue if not already in set)
626         for (auto &dep : sr->get_dependencies()) {
627             service_record * dep_to = dep.get_to();
628             if (dep_to == from_service) {
629                 // fail, circular dependency!
630                 char nak_rep[] = { DINIT_RP_NAK };
631                 if (! queue_packet(nak_rep, 1)) return false;
632                 rbuf.consume(pkt_size);
633                 chklen = 0;
634                 return true;
635             }
636             if (dep_marks.insert(dep_to).second) {
637                 dep_queue.push_back(dep_to);
638             }
639         }
640     }
641     dep_marks.clear();
642     dep_queue.clear();
643
644     bool dep_exists = false;
645     service_dep * dep_record = nullptr;
646
647     // Prevent creation of duplicate dependency:
648     for (auto &dep : from_service->get_dependencies()) {
649         service_record * dep_to = dep.get_to();
650         if (dep_to == to_service && dep.dep_type == dep_type) {
651             // Dependency already exists
652             dep_exists = true;
653             dep_record = &dep;
654             break;
655         }
656     }
657
658     if (! dep_exists) {
659         // Create dependency:
660         dep_record = &(from_service->add_dep(to_service, dep_type));
661         services->process_queues();
662     }
663
664     if (do_enable && contains({service_state_t::STARTED, service_state_t::STARTING},
665             from_service->get_state())) {
666         // The dependency record is activated: mark it as holding acquisition of the dependency, and start
667         // the dependency.
668         if (!services->is_shutting_down()) {
669             dep_record->get_from()->start_dep(*dep_record);
670             services->process_queues();
671         }
672     }
673
674     char ack_rep[] = { DINIT_RP_ACK };
675     if (! queue_packet(ack_rep, 1)) return false;
676     rbuf.consume(pkt_size);
677     chklen = 0;
678     return true;
679 }
680
681 bool control_conn_t::rm_service_dep()
682 {
683     // 1 byte packet type
684     // 1 byte dependency type
685     // handle: "from"
686     // handle: "to"
687
688     constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
689
690     if (rbuf.get_length() < pkt_size) {
691         chklen = pkt_size;
692         return true;
693     }
694
695     handle_t from_handle;
696     handle_t to_handle;
697     rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
698     rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
699
700     service_record *from_service = find_service_for_key(from_handle);
701     service_record *to_service = find_service_for_key(to_handle);
702     if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
703         // Service handle is bad
704         char badreq_rep[] = { DINIT_RP_BADREQ };
705         if (! queue_packet(badreq_rep, 1)) return false;
706         bad_conn_close = true;
707         iob.set_watches(OUT_EVENTS);
708         return true;
709     }
710
711     // Check dependency type is valid:
712     int dep_type_int = rbuf[1];
713     if (! contains({dependency_type::MILESTONE, dependency_type::REGULAR,
714             dependency_type::WAITS_FOR}, dep_type_int)) {
715         char badreqRep[] = { DINIT_RP_BADREQ };
716         if (! queue_packet(badreqRep, 1)) return false;
717         bad_conn_close = true;
718         iob.set_watches(OUT_EVENTS);
719     }
720     dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
721
722     // Remove dependency:
723     from_service->rm_dep(to_service, dep_type);
724     services->process_queues();
725
726     char ack_rep[] = { DINIT_RP_ACK };
727     if (! queue_packet(ack_rep, 1)) return false;
728     rbuf.consume(pkt_size);
729     chklen = 0;
730     return true;
731 }
732
733 bool control_conn_t::process_query_name()
734 {
735     // 1 byte packet type
736     // 1 byte reserved
737     // handle: service
738     constexpr int pkt_size = 2 + sizeof(handle_t);
739
740     if (rbuf.get_length() < pkt_size) {
741         chklen = pkt_size;
742         return true;
743     }
744
745     // Reply:
746     // 1 byte packet type = DINIT_RP_SERVICENAME
747     // 1 byte reserved
748     // uint16_t length
749     // N bytes name
750
751     handle_t handle;
752     rbuf.extract(&handle, 2, sizeof(handle));
753     rbuf.consume(pkt_size);
754     chklen = 0;
755
756     service_record *service = find_service_for_key(handle);
757     if (service == nullptr || service->get_name().length() > std::numeric_limits<uint16_t>::max()) {
758         char nak_rep[] = { DINIT_RP_NAK };
759         return queue_packet(nak_rep, 1);
760     }
761
762     std::vector<char> reply;
763     const std::string &name = service->get_name();
764     uint16_t name_length = name.length();
765     reply.resize(2 + sizeof(uint16_t) + name_length);
766     reply[0] = DINIT_RP_SERVICENAME;
767     memcpy(reply.data() + 2, &name_length, sizeof(name_length));
768     memcpy(reply.data() + 2 + sizeof(uint16_t), name.c_str(), name_length);
769
770     return queue_packet(std::move(reply));
771 }
772
773 bool control_conn_t::query_load_mech()
774 {
775     rbuf.consume(1);
776     chklen = 0;
777
778     if (services->get_set_type_id() == SSET_TYPE_DIRLOAD) {
779         dirload_service_set *dss = static_cast<dirload_service_set *>(services);
780         std::vector<char> reppkt;
781         reppkt.resize(2 + sizeof(uint32_t) * 2);  // packet type, loader type, packet size, # dirs
782         reppkt[0] = DINIT_RP_LOADER_MECH;
783         reppkt[1] = SSET_TYPE_DIRLOAD;
784
785         // Number of directories in load path:
786         uint32_t sdirs = dss->get_service_dir_count();
787         std::memcpy(reppkt.data() + 2 + sizeof(uint32_t), &sdirs, sizeof(sdirs));
788
789         // Our current working directory, which above are relative to:
790         // leave sizeof(uint32_t) for size, which we'll fill in afterwards:
791         std::size_t curpos = reppkt.size() + sizeof(uint32_t);
792 #ifdef PATH_MAX
793         uint32_t try_path_size = PATH_MAX;
794 #else
795         uint32_t try_path_size = 2048;
796 #endif
797         char *wd;
798         while (true) {
799             std::size_t total_size = curpos + std::size_t(try_path_size);
800             if (total_size < curpos) {
801                 // Overflow. In theory we could now limit to size_t max, but the size must already
802                 // be crazy long; let's abort.
803                 char ack_rep[] = { DINIT_RP_NAK };
804                 if (! queue_packet(ack_rep, 1)) return false;
805                 return true;
806             }
807             reppkt.resize(total_size);
808             wd = getcwd(reppkt.data() + curpos, try_path_size);
809             if (wd != nullptr) break;
810
811             // Keep doubling the path size we try until it's big enough, or we get numeric overflow
812             uint32_t new_try_path_size = try_path_size * uint32_t(2u);
813             if (new_try_path_size < try_path_size) {
814                 // Overflow.
815                 char ack_rep[] = { DINIT_RP_NAK };
816                 return queue_packet(ack_rep, 1);
817             }
818             try_path_size = new_try_path_size;
819         }
820
821         uint32_t wd_len = std::strlen(reppkt.data() + curpos);
822         reppkt.resize(curpos + std::size_t(wd_len));
823         std::memcpy(reppkt.data() + curpos - sizeof(uint32_t), &wd_len, sizeof(wd_len));
824
825         // Each directory in the load path:
826         for (int i = 0; uint32_t(i) < sdirs; i++) {
827             const char *sdir = dss->get_service_dir(i);
828             uint32_t dlen = std::strlen(sdir);
829             auto cursize = reppkt.size();
830             reppkt.resize(cursize + sizeof(dlen) + dlen);
831             std::memcpy(reppkt.data() + cursize, &dlen, sizeof(dlen));
832             std::memcpy(reppkt.data() + cursize + sizeof(dlen), sdir, dlen);
833         }
834
835         // Total packet size:
836         uint32_t fsize = reppkt.size();
837         std::memcpy(reppkt.data() + 2, &fsize, sizeof(fsize));
838
839         if (! queue_packet(std::move(reppkt))) return false;
840         return true;
841     }
842     else {
843         // If we don't know how to deal with the service set type, send a NAK reply:
844         char ack_rep[] = { DINIT_RP_NAK };
845         return queue_packet(ack_rep, 1);
846     }
847 }
848
849 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
850 {
851     // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
852     // we find a gap in the handle values.
853     handle_t candidate = 0;
854     for (auto p : key_service_map) {
855         if (p.first == candidate) ++candidate;
856         else break;
857     }
858
859     bool is_unique = (service_key_map.find(record) == service_key_map.end());
860
861     // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
862     // must undo any previous actions:
863     if (is_unique) {
864         record->add_listener(this);
865     }
866     
867     try {
868         key_service_map[candidate] = record;
869         service_key_map.insert(std::make_pair(record, candidate));
870     }
871     catch (...) {
872         if (is_unique) {
873             record->remove_listener(this);
874         }
875
876         key_service_map.erase(candidate);
877     }
878     
879     return candidate;
880 }
881
882 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
883 {
884     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
885     bool was_empty = outbuf.empty();
886
887     // If the queue is empty, we can try to write the packet out now rather than queueing it.
888     // If the write is unsuccessful or partial, we queue the remainder.
889     if (was_empty) {
890         int wr = bp_sys::write(iob.get_watched_fd(), pkt, size);
891         if (wr == -1) {
892             if (errno == EPIPE) {
893                 return false;
894             }
895             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
896                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
897                 return false;
898             }
899             // EAGAIN etc: fall through to below
900         }
901         else {
902             if ((unsigned)wr == size) {
903                 // Ok, all written.
904                 iob.set_watches(in_flag);
905                 return true;
906             }
907             pkt += wr;
908             size -= wr;
909         }
910     }
911     
912     // Create a vector out of the (remaining part of the) packet:
913     try {
914         outbuf.emplace_back(pkt, pkt + size);
915         iob.set_watches(in_flag | OUT_EVENTS);
916         return true;
917     }
918     catch (std::bad_alloc &baexc) {
919         // Mark the connection bad, and stop reading further requests
920         bad_conn_close = true;
921         oom_close = true;
922         if (was_empty) {
923             // We can't send out-of-memory response as we already wrote as much as we
924             // could above. Neither can we later send the response since we have currently
925             // sent an incomplete packet. All we can do is close the connection.
926             return false;
927         }
928         else {
929             iob.set_watches(OUT_EVENTS);
930             return true;
931         }
932     }
933 }
934
935 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
936 // make them extraordinary difficult to combine into a single method.
937 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
938 {
939     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
940     bool was_empty = outbuf.empty();
941     
942     if (was_empty) {
943         outpkt_index = 0;
944         // We can try sending the packet immediately:
945         int wr = bp_sys::write(iob.get_watched_fd(), pkt.data(), pkt.size());
946         if (wr == -1) {
947             if (errno == EPIPE) {
948                 return false;
949             }
950             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
951                 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
952                 return false;
953             }
954             // EAGAIN etc: fall through to below
955         }
956         else {
957             if ((unsigned)wr == pkt.size()) {
958                 // Ok, all written.
959                 iob.set_watches(in_flag);
960                 return true;
961             }
962             outpkt_index = wr;
963         }
964     }
965     
966     try {
967         outbuf.emplace_back(pkt);
968         iob.set_watches(in_flag | OUT_EVENTS);
969         return true;
970     }
971     catch (std::bad_alloc &baexc) {
972         // Mark the connection bad, and stop reading further requests
973         bad_conn_close = true;
974         oom_close = true;
975         if (was_empty) {
976             // We can't send out-of-memory response as we already wrote as much as we
977             // could above. Neither can we later send the response since we have currently
978             // sent an incomplete packet. All we can do is close the connection.
979             return false;
980         }
981         else {
982             iob.set_watches(OUT_EVENTS);
983             return true;
984         }
985     }
986 }
987
988 bool control_conn_t::data_ready() noexcept
989 {
990     int fd = iob.get_watched_fd();
991     
992     int r = rbuf.fill(fd);
993     
994     // Note file descriptor is non-blocking
995     if (r == -1) {
996         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
997             log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
998             return true;
999         }
1000         return false;
1001     }
1002     
1003     if (r == 0) {
1004         return true;
1005     }
1006     
1007     // complete packet?
1008     if (rbuf.get_length() >= chklen) {
1009         try {
1010             return !process_packet();
1011         }
1012         catch (std::bad_alloc &baexc) {
1013             do_oom_close();
1014             return false;
1015         }
1016     }
1017     else if (rbuf.get_length() == 1024) {
1018         // Too big packet
1019         log(loglevel_t::WARN, "Received too-large control package; dropping connection");
1020         bad_conn_close = true;
1021         iob.set_watches(OUT_EVENTS);
1022     }
1023     else {
1024         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
1025         iob.set_watches(IN_EVENTS | out_flags);
1026     }
1027     
1028     return false;
1029 }
1030
1031 bool control_conn_t::send_data() noexcept
1032 {
1033     if (outbuf.empty() && bad_conn_close) {
1034         if (oom_close) {
1035             // Send oom response
1036             char oomBuf[] = { DINIT_RP_OOM };
1037             bp_sys::write(iob.get_watched_fd(), oomBuf, 1);
1038         }
1039         return true;
1040     }
1041     
1042     vector<char> & pkt = outbuf.front();
1043     char *data = pkt.data();
1044     int written = bp_sys::write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
1045     if (written == -1) {
1046         if (errno == EPIPE) {
1047             // read end closed
1048             return true;
1049         }
1050         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
1051             // spurious readiness notification?
1052         }
1053         else {
1054             log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
1055             return true;
1056         }
1057         return false;
1058     }
1059
1060     outpkt_index += written;
1061     if (outpkt_index == pkt.size()) {
1062         // We've finished this packet, move on to the next:
1063         outbuf.pop_front();
1064         outpkt_index = 0;
1065         if (outbuf.empty() && ! oom_close) {
1066             if (! bad_conn_close) {
1067                 iob.set_watches(IN_EVENTS);
1068             }
1069             else {
1070                 return true;
1071             }
1072         }
1073     }
1074     
1075     return false;
1076 }
1077
1078 control_conn_t::~control_conn_t() noexcept
1079 {
1080     bp_sys::close(iob.get_watched_fd());
1081     iob.deregister(loop);
1082     
1083     // Clear service listeners
1084     for (auto p : service_key_map) {
1085         p.first->remove_listener(this);
1086     }
1087     
1088     active_control_conns--;
1089 }