5 constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
6 constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
9 bool control_conn_t::process_packet()
13 // Note that where we call queue_packet, we must generally check the return value. If it
14 // returns false it has either deleted the connection or marked it for deletion; we
15 // shouldn't touch instance members after that point.
17 int pktType = rbuf[0];
18 if (pktType == DINIT_CP_QUERYVERSION) {
20 // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) actual version
21 char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
22 if (! queue_packet(replyBuf, sizeof(replyBuf))) return false;
26 if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
27 return process_find_load(pktType);
29 if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
30 || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
31 return process_start_stop(pktType);
33 if (pktType == DINIT_CP_UNPINSERVICE) {
34 return process_unpin_service();
36 if (pktType == DINIT_CP_UNLOADSERVICE) {
37 return process_unload_service();
39 if (pktType == DINIT_CP_SHUTDOWN) {
41 if (rbuf.get_length() < 2) {
46 auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
48 services->stop_all_services(sd_type);
49 char ackBuf[] = { DINIT_RP_ACK };
50 if (! queue_packet(ackBuf, 1)) return false;
52 // Clear the packet from the buffer
57 if (pktType == DINIT_CP_LISTSERVICES) {
58 return list_services();
61 // Unrecognized: give error response
62 char outbuf[] = { DINIT_RP_BADREQ };
63 if (! queue_packet(outbuf, 1)) return false;
64 bad_conn_close = true;
65 iob.set_watches(OUT_EVENTS);
70 bool control_conn_t::process_find_load(int pktType)
74 constexpr int pkt_size = 4;
76 if (rbuf.get_length() < pkt_size) {
82 rbuf.extract((char *)&svcSize, 1, 2);
83 chklen = svcSize + 3; // packet type + (2 byte) length + service name
84 if (svcSize <= 0 || chklen > 1024) {
85 // Queue error response / mark connection bad
86 char badreqRep[] = { DINIT_RP_BADREQ };
87 if (! queue_packet(badreqRep, 1)) return false;
88 bad_conn_close = true;
89 iob.set_watches(OUT_EVENTS);
93 if (rbuf.get_length() < chklen) {
94 // packet not complete yet; read more
98 service_record * record = nullptr;
100 string serviceName = rbuf.extract_string(3, svcSize);
102 if (pktType == DINIT_CP_LOADSERVICE) {
105 record = services->load_service(serviceName.c_str());
107 catch (service_load_exc &slexc) {
108 log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
113 record = services->find_service(serviceName.c_str());
116 if (record != nullptr) {
117 // Allocate a service handle
118 handle_t handle = allocate_service_handle(record);
119 std::vector<char> rp_buf;
121 rp_buf.push_back(DINIT_RP_SERVICERECORD);
122 rp_buf.push_back(static_cast<char>(record->get_state()));
123 for (int i = 0; i < (int) sizeof(handle); i++) {
124 rp_buf.push_back(*(((char *) &handle) + i));
126 rp_buf.push_back(static_cast<char>(record->get_target_state()));
127 if (! queue_packet(std::move(rp_buf))) return false;
130 std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
131 if (! queue_packet(std::move(rp_buf))) return false;
134 // Clear the packet from the buffer
135 rbuf.consume(chklen);
140 bool control_conn_t::process_start_stop(int pktType)
144 constexpr int pkt_size = 2 + sizeof(handle_t);
146 if (rbuf.get_length() < pkt_size) {
151 // 1 byte: packet type
152 // 1 byte: pin in requested state (0 = no pin, 1 = pin)
153 // 4 bytes: service handle
155 bool do_pin = (rbuf[1] == 1);
157 rbuf.extract((char *) &handle, 2, sizeof(handle));
159 service_record *service = find_service_for_key(handle);
160 if (service == nullptr) {
161 // Service handle is bad
162 char badreqRep[] = { DINIT_RP_BADREQ };
163 if (! queue_packet(badreqRep, 1)) return false;
164 bad_conn_close = true;
165 iob.set_watches(OUT_EVENTS);
169 bool already_there = false;
172 case DINIT_CP_STARTSERVICE:
173 // start service, mark as required
174 if (do_pin) service->pin_start();
176 services->process_queues();
177 already_there = service->get_state() == service_state_t::STARTED;
179 case DINIT_CP_STOPSERVICE:
180 // force service to stop
181 if (do_pin) service->pin_stop();
183 service->forced_stop();
184 services->process_queues();
185 already_there = service->get_state() == service_state_t::STOPPED;
187 case DINIT_CP_WAKESERVICE:
188 // re-start a stopped service (do not mark as required)
189 if (do_pin) service->pin_start();
190 service->start(false);
191 services->process_queues();
192 already_there = service->get_state() == service_state_t::STARTED;
194 case DINIT_CP_RELEASESERVICE:
195 // remove required mark, stop if not required by dependents
196 if (do_pin) service->pin_stop();
197 service->stop(false);
198 services->process_queues();
199 already_there = service->get_state() == service_state_t::STOPPED;
203 char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
205 if (! queue_packet(ack_buf, 1)) return false;
208 // Clear the packet from the buffer
209 rbuf.consume(pkt_size);
214 bool control_conn_t::process_unpin_service()
218 constexpr int pkt_size = 1 + sizeof(handle_t);
220 if (rbuf.get_length() < pkt_size) {
225 // 1 byte: packet type
226 // 4 bytes: service handle
229 rbuf.extract((char *) &handle, 1, sizeof(handle));
231 service_record *service = find_service_for_key(handle);
232 if (service == nullptr) {
233 // Service handle is bad
234 char badreqRep[] = { DINIT_RP_BADREQ };
235 if (! queue_packet(badreqRep, 1)) return false;
236 bad_conn_close = true;
237 iob.set_watches(OUT_EVENTS);
242 services->process_queues();
243 char ack_buf[] = { (char) DINIT_RP_ACK };
244 if (! queue_packet(ack_buf, 1)) return false;
246 // Clear the packet from the buffer
247 rbuf.consume(pkt_size);
252 bool control_conn_t::process_unload_service()
256 constexpr int pkt_size = 1 + sizeof(handle_t);
258 if (rbuf.get_length() < pkt_size) {
263 // 1 byte: packet type
264 // 4 bytes: service handle
267 rbuf.extract((char *) &handle, 1, sizeof(handle));
269 service_record *service = find_service_for_key(handle);
270 if (service == nullptr) {
271 // Service handle is bad
272 char badreq_rep[] = { DINIT_RP_BADREQ };
273 if (! queue_packet(badreq_rep, 1)) return false;
274 bad_conn_close = true;
275 iob.set_watches(OUT_EVENTS);
279 if (! service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
280 // Cannot unload: has other references
281 char nak_rep[] = { DINIT_RP_NAK };
282 if (! queue_packet(nak_rep, 1)) return false;
286 service->prepare_for_unload();
287 services->remove_service(service);
291 service_key_map.erase(service);
292 key_service_map.erase(handle);
295 char ack_buf[] = { (char) DINIT_RP_ACK };
296 if (! queue_packet(ack_buf, 1)) return false;
299 // Clear the packet from the buffer
300 rbuf.consume(pkt_size);
305 bool control_conn_t::list_services()
307 rbuf.consume(1); // clear request packet
311 auto slist = services->list_services();
312 for (auto sptr : slist) {
313 std::vector<char> pkt_buf;
315 const std::string &name = sptr->get_name();
316 int nameLen = std::min((size_t)256, name.length());
317 pkt_buf.resize(8 + nameLen);
319 pkt_buf[0] = DINIT_RP_SVCINFO;
320 pkt_buf[1] = nameLen;
321 pkt_buf[2] = static_cast<char>(sptr->get_state());
322 pkt_buf[3] = static_cast<char>(sptr->get_target_state());
324 pkt_buf[4] = 0; // reserved
329 for (int i = 0; i < nameLen; i++) {
330 pkt_buf[8+i] = name[i];
333 if (! queue_packet(std::move(pkt_buf))) return false;
336 char ack_buf[] = { (char) DINIT_RP_LISTDONE };
337 if (! queue_packet(ack_buf, 1)) return false;
341 catch (std::bad_alloc &exc)
348 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
350 // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
351 // we find a gap in the handle values.
352 handle_t candidate = 0;
353 for (auto p : key_service_map) {
354 if (p.first == candidate) candidate++;
358 bool is_unique = (service_key_map.find(record) == service_key_map.end());
360 // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
361 // must undo any previous actions:
363 record->add_listener(this);
367 key_service_map[candidate] = record;
368 service_key_map.insert(std::make_pair(record, candidate));
372 record->remove_listener(this);
375 key_service_map.erase(candidate);
381 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
383 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
384 bool was_empty = outbuf.empty();
386 // If the queue is empty, we can try to write the packet out now rather than queueing it.
387 // If the write is unsuccessful or partial, we queue the remainder.
389 int wr = write(iob.get_watched_fd(), pkt, size);
391 if (errno == EPIPE) {
394 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
395 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
398 // EAGAIN etc: fall through to below
401 if ((unsigned)wr == size) {
403 iob.set_watches(in_flag);
411 // Create a vector out of the (remaining part of the) packet:
413 outbuf.emplace_back(pkt, pkt + size);
414 iob.set_watches(in_flag | OUT_EVENTS);
417 catch (std::bad_alloc &baexc) {
418 // Mark the connection bad, and stop reading further requests
419 bad_conn_close = true;
422 // We can't send out-of-memory response as we already wrote as much as we
423 // could above. Neither can we later send the response since we have currently
424 // sent an incomplete packet. All we can do is close the connection.
428 iob.set_watches(OUT_EVENTS);
434 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
435 // make them extraordinary difficult to combine into a single method.
436 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
438 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
439 bool was_empty = outbuf.empty();
443 // We can try sending the packet immediately:
444 int wr = write(iob.get_watched_fd(), pkt.data(), pkt.size());
446 if (errno == EPIPE) {
449 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
450 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
453 // EAGAIN etc: fall through to below
456 if ((unsigned)wr == pkt.size()) {
458 iob.set_watches(in_flag);
466 outbuf.emplace_back(pkt);
467 iob.set_watches(in_flag | OUT_EVENTS);
470 catch (std::bad_alloc &baexc) {
471 // Mark the connection bad, and stop reading further requests
472 bad_conn_close = true;
475 // We can't send out-of-memory response as we already wrote as much as we
476 // could above. Neither can we later send the response since we have currently
477 // sent an incomplete packet. All we can do is close the connection.
481 iob.set_watches(OUT_EVENTS);
487 bool control_conn_t::rollback_complete() noexcept
489 char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
490 return queue_packet(ackBuf, 2);
493 bool control_conn_t::data_ready() noexcept
495 int fd = iob.get_watched_fd();
497 int r = rbuf.fill(fd);
499 // Note file descriptor is non-blocking
501 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
502 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
513 if (rbuf.get_length() >= chklen) {
515 return !process_packet();
517 catch (std::bad_alloc &baexc) {
522 else if (rbuf.get_length() == 1024) {
524 log(loglevel_t::WARN, "Received too-large control package; dropping connection");
525 bad_conn_close = true;
526 iob.set_watches(OUT_EVENTS);
529 int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
530 iob.set_watches(IN_EVENTS | out_flags);
536 bool control_conn_t::send_data() noexcept
538 if (outbuf.empty() && bad_conn_close) {
541 char oomBuf[] = { DINIT_RP_OOM };
542 write(iob.get_watched_fd(), oomBuf, 1);
547 vector<char> & pkt = outbuf.front();
548 char *data = pkt.data();
549 int written = write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
551 if (errno == EPIPE) {
555 else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
556 // spurious readiness notification?
559 log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
565 outpkt_index += written;
566 if (outpkt_index == pkt.size()) {
567 // We've finished this packet, move on to the next:
570 if (outbuf.empty() && ! oom_close) {
571 if (! bad_conn_close) {
572 iob.set_watches(IN_EVENTS);
583 control_conn_t::~control_conn_t() noexcept
585 close(iob.get_watched_fd());
586 iob.deregister(loop);
588 // Clear service listeners
589 for (auto p : service_key_map) {
590 p.first->remove_listener(this);
593 active_control_conns--;