5 constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
6 constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
9 bool control_conn_t::process_packet()
13 // Note that where we call queue_packet, we must generally check the return value. If it
14 // returns false it has either deleted the connection or marked it for deletion; we
15 // shouldn't touch instance members after that point.
17 int pktType = rbuf[0];
18 if (pktType == DINIT_CP_QUERYVERSION) {
20 // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
21 char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
22 if (! queue_packet(replyBuf, 1)) return false;
26 if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
27 return process_find_load(pktType);
29 if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
30 || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
31 return process_start_stop(pktType);
33 if (pktType == DINIT_CP_UNPINSERVICE) {
34 return process_unpin_service();
36 if (pktType == DINIT_CP_SHUTDOWN) {
38 if (rbuf.get_length() < 2) {
43 auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
45 services->stop_all_services(sd_type);
46 char ackBuf[] = { DINIT_RP_ACK };
47 if (! queue_packet(ackBuf, 1)) return false;
49 // Clear the packet from the buffer
54 if (pktType == DINIT_CP_LISTSERVICES) {
55 return list_services();
58 // Unrecognized: give error response
59 char outbuf[] = { DINIT_RP_BADREQ };
60 if (! queue_packet(outbuf, 1)) return false;
61 bad_conn_close = true;
62 iob.set_watches(OUT_EVENTS);
67 bool control_conn_t::process_find_load(int pktType)
71 constexpr int pkt_size = 4;
73 if (rbuf.get_length() < pkt_size) {
79 rbuf.extract((char *)&svcSize, 1, 2);
80 chklen = svcSize + 3; // packet type + (2 byte) length + service name
81 if (svcSize <= 0 || chklen > 1024) {
82 // Queue error response / mark connection bad
83 char badreqRep[] = { DINIT_RP_BADREQ };
84 if (! queue_packet(badreqRep, 1)) return false;
85 bad_conn_close = true;
86 iob.set_watches(OUT_EVENTS);
90 if (rbuf.get_length() < chklen) {
91 // packet not complete yet; read more
95 service_record * record = nullptr;
97 string serviceName = rbuf.extract_string(3, svcSize);
99 if (pktType == DINIT_CP_LOADSERVICE) {
102 record = services->load_service(serviceName.c_str());
104 catch (service_load_exc &slexc) {
105 log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
110 record = services->find_service(serviceName.c_str());
113 if (record != nullptr) {
114 // Allocate a service handle
115 handle_t handle = allocate_service_handle(record);
116 std::vector<char> rp_buf;
118 rp_buf.push_back(DINIT_RP_SERVICERECORD);
119 rp_buf.push_back(static_cast<char>(record->get_state()));
120 for (int i = 0; i < (int) sizeof(handle); i++) {
121 rp_buf.push_back(*(((char *) &handle) + i));
123 rp_buf.push_back(static_cast<char>(record->get_target_state()));
124 if (! queue_packet(std::move(rp_buf))) return false;
127 std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
128 if (! queue_packet(std::move(rp_buf))) return false;
131 // Clear the packet from the buffer
132 rbuf.consume(chklen);
137 bool control_conn_t::process_start_stop(int pktType)
141 constexpr int pkt_size = 2 + sizeof(handle_t);
143 if (rbuf.get_length() < pkt_size) {
148 // 1 byte: packet type
149 // 1 byte: pin in requested state (0 = no pin, 1 = pin)
150 // 4 bytes: service handle
152 bool do_pin = (rbuf[1] == 1);
154 rbuf.extract((char *) &handle, 2, sizeof(handle));
156 service_record *service = find_service_for_key(handle);
157 if (service == nullptr) {
158 // Service handle is bad
159 char badreqRep[] = { DINIT_RP_BADREQ };
160 if (! queue_packet(badreqRep, 1)) return false;
161 bad_conn_close = true;
162 iob.set_watches(OUT_EVENTS);
166 bool already_there = false;
169 case DINIT_CP_STARTSERVICE:
170 // start service, mark as required
171 if (do_pin) service->pin_start();
173 services->process_queues();
174 already_there = service->get_state() == service_state_t::STARTED;
176 case DINIT_CP_STOPSERVICE:
177 // force service to stop
178 if (do_pin) service->pin_stop();
180 service->forced_stop();
181 services->process_queues();
182 already_there = service->get_state() == service_state_t::STOPPED;
184 case DINIT_CP_WAKESERVICE:
185 // re-start a stopped service (do not mark as required)
186 if (do_pin) service->pin_start();
187 service->start(false);
188 services->process_queues();
189 already_there = service->get_state() == service_state_t::STARTED;
191 case DINIT_CP_RELEASESERVICE:
192 // remove required mark, stop if not required by dependents
193 if (do_pin) service->pin_stop();
194 service->stop(false);
195 services->process_queues();
196 already_there = service->get_state() == service_state_t::STOPPED;
200 char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
202 if (! queue_packet(ack_buf, 1)) return false;
205 // Clear the packet from the buffer
206 rbuf.consume(pkt_size);
211 bool control_conn_t::process_unpin_service()
215 constexpr int pkt_size = 1 + sizeof(handle_t);
217 if (rbuf.get_length() < pkt_size) {
222 // 1 byte: packet type
223 // 4 bytes: service handle
226 rbuf.extract((char *) &handle, 1, sizeof(handle));
228 service_record *service = find_service_for_key(handle);
229 if (service == nullptr) {
230 // Service handle is bad
231 char badreqRep[] = { DINIT_RP_BADREQ };
232 if (! queue_packet(badreqRep, 1)) return false;
233 bad_conn_close = true;
234 iob.set_watches(OUT_EVENTS);
239 services->process_queues();
240 char ack_buf[] = { (char) DINIT_RP_ACK };
241 if (! queue_packet(ack_buf, 1)) return false;
244 // Clear the packet from the buffer
245 rbuf.consume(pkt_size);
250 bool control_conn_t::list_services()
252 rbuf.consume(1); // clear request packet
256 auto slist = services->list_services();
257 for (auto sptr : slist) {
258 std::vector<char> pkt_buf;
260 const std::string &name = sptr->get_name();
261 int nameLen = std::min((size_t)256, name.length());
262 pkt_buf.resize(8 + nameLen);
264 pkt_buf[0] = DINIT_RP_SVCINFO;
265 pkt_buf[1] = nameLen;
266 pkt_buf[2] = static_cast<char>(sptr->get_state());
267 pkt_buf[3] = static_cast<char>(sptr->get_target_state());
269 pkt_buf[4] = 0; // reserved
274 for (int i = 0; i < nameLen; i++) {
275 pkt_buf[8+i] = name[i];
278 if (! queue_packet(std::move(pkt_buf))) return false;
281 char ack_buf[] = { (char) DINIT_RP_LISTDONE };
282 if (! queue_packet(ack_buf, 1)) return false;
286 catch (std::bad_alloc &exc)
293 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
295 bool is_unique = true;
296 handle_t largest_seen = 0;
297 handle_t candidate = 0;
298 for (auto p : keyServiceMap) {
299 if (p.first > largest_seen) largest_seen = p.first;
300 if (p.first == candidate) {
301 if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
302 candidate = largest_seen + 1;
304 is_unique &= (p.second != record);
307 keyServiceMap[candidate] = record;
308 serviceKeyMap.insert(std::make_pair(record, candidate));
311 record->add_listener(this);
317 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
319 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
320 bool was_empty = outbuf.empty();
322 // If the queue is empty, we can try to write the packet out now rather than queueing it.
323 // If the write is unsuccessful or partial, we queue the remainder.
325 int wr = write(iob.get_watched_fd(), pkt, size);
327 if (errno == EPIPE) {
330 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
331 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
334 // EAGAIN etc: fall through to below
337 if ((unsigned)wr == size) {
339 iob.set_watches(in_flag);
347 // Create a vector out of the (remaining part of the) packet:
349 outbuf.emplace_back(pkt, pkt + size);
350 iob.set_watches(in_flag | OUT_EVENTS);
353 catch (std::bad_alloc &baexc) {
354 // Mark the connection bad, and stop reading further requests
355 bad_conn_close = true;
358 // We can't send out-of-memory response as we already wrote as much as we
359 // could above. Neither can we later send the response since we have currently
360 // sent an incomplete packet. All we can do is close the connection.
364 iob.set_watches(OUT_EVENTS);
370 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
371 // make them extraordinary difficult to combine into a single method.
372 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
374 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
375 bool was_empty = outbuf.empty();
379 // We can try sending the packet immediately:
380 int wr = write(iob.get_watched_fd(), pkt.data(), pkt.size());
382 if (errno == EPIPE) {
385 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
386 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
389 // EAGAIN etc: fall through to below
392 if ((unsigned)wr == pkt.size()) {
394 iob.set_watches(in_flag);
402 outbuf.emplace_back(pkt);
403 iob.set_watches(in_flag | OUT_EVENTS);
406 catch (std::bad_alloc &baexc) {
407 // Mark the connection bad, and stop reading further requests
408 bad_conn_close = true;
411 // We can't send out-of-memory response as we already wrote as much as we
412 // could above. Neither can we later send the response since we have currently
413 // sent an incomplete packet. All we can do is close the connection.
417 iob.set_watches(OUT_EVENTS);
423 bool control_conn_t::rollback_complete() noexcept
425 char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
426 return queue_packet(ackBuf, 2);
429 bool control_conn_t::data_ready() noexcept
431 int fd = iob.get_watched_fd();
433 int r = rbuf.fill(fd);
435 // Note file descriptor is non-blocking
437 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
438 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
449 if (rbuf.get_length() >= chklen) {
451 return !process_packet();
453 catch (std::bad_alloc &baexc) {
458 else if (rbuf.get_length() == 1024) {
460 log(loglevel_t::WARN, "Received too-large control package; dropping connection");
461 bad_conn_close = true;
462 iob.set_watches(OUT_EVENTS);
465 int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
466 iob.set_watches(IN_EVENTS | out_flags);
472 bool control_conn_t::send_data() noexcept
474 if (outbuf.empty() && bad_conn_close) {
477 char oomBuf[] = { DINIT_RP_OOM };
478 write(iob.get_watched_fd(), oomBuf, 1);
483 vector<char> & pkt = outbuf.front();
484 char *data = pkt.data();
485 int written = write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
487 if (errno == EPIPE) {
491 else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
492 // spurious readiness notification?
495 log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
501 outpkt_index += written;
502 if (outpkt_index == pkt.size()) {
503 // We've finished this packet, move on to the next:
506 if (outbuf.empty() && ! oom_close) {
507 if (! bad_conn_close) {
508 iob.set_watches(IN_EVENTS);
519 control_conn_t::~control_conn_t() noexcept
521 close(iob.get_watched_fd());
522 iob.deregister(loop);
524 // Clear service listeners
525 for (auto p : serviceKeyMap) {
526 p.first->remove_listener(this);
529 active_control_conns--;