Yet another Dasynq update
[oweals/dinit.git] / src / control.cc
1 #include "control.h"
2 #include "service.h"
3
4 bool ControlConn::processPacket()
5 {
6     using std::string;
7     
8     // Note that where we call queuePacket, we must generally check the return value. If it
9     // returns false it has either deleted the connection or marked it for deletion; we
10     // shouldn't touch instance members after that point.
11
12     int pktType = rbuf[0];
13     if (pktType == DINIT_CP_QUERYVERSION) {
14         // Responds with:
15         // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
16         char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
17         if (! queuePacket(replyBuf, 1)) return false;
18         rbuf.consume(1);
19         return true;
20     }
21     if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
22         return processFindLoad(pktType);
23     }
24     if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
25             || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
26         return processStartStop(pktType);
27     }
28     if (pktType == DINIT_CP_UNPINSERVICE) {
29         return processUnpinService();
30     }
31     if (pktType == DINIT_CP_SHUTDOWN) {
32         // Shutdown/reboot
33         if (rbuf.get_length() < 2) {
34             chklen = 2;
35             return true;
36         }
37         
38         auto sd_type = static_cast<ShutdownType>(rbuf[1]);
39         
40         service_set->stop_all_services(sd_type);
41         char ackBuf[] = { DINIT_RP_ACK };
42         if (! queuePacket(ackBuf, 1)) return false;
43         
44         // Clear the packet from the buffer
45         rbuf.consume(2);
46         chklen = 0;
47         return true;
48     }
49     else {
50         // Unrecognized: give error response
51         char outbuf[] = { DINIT_RP_BADREQ };
52         if (! queuePacket(outbuf, 1)) return false;
53         bad_conn_close = true;
54         iob.setWatchFlags(OUT_EVENTS);
55     }
56     return true;
57 }
58
59 bool ControlConn::processFindLoad(int pktType)
60 {
61     using std::string;
62     
63     constexpr int pkt_size = 4;
64     
65     if (rbuf.get_length() < pkt_size) {
66         chklen = pkt_size;
67         return true;
68     }
69     
70     uint16_t svcSize;
71     rbuf.extract((char *)&svcSize, 1, 2);
72     chklen = svcSize + 3; // packet type + (2 byte) length + service name
73     if (svcSize <= 0 || chklen > 1024) {
74         // Queue error response / mark connection bad
75         char badreqRep[] = { DINIT_RP_BADREQ };
76         if (! queuePacket(badreqRep, 1)) return false;
77         bad_conn_close = true;
78         iob.setWatchFlags(OUT_EVENTS);
79         return true;
80     }
81     
82     if (rbuf.get_length() < chklen) {
83         // packet not complete yet; read more
84         return true;
85     }
86     
87     ServiceRecord * record = nullptr;
88     
89     string serviceName = rbuf.extract_string(3, svcSize);
90     
91     if (pktType == DINIT_CP_LOADSERVICE) {
92         // LOADSERVICE
93         try {
94             record = service_set->loadService(serviceName);
95         }
96         catch (ServiceLoadExc &slexc) {
97             log(LogLevel::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
98         }
99     }
100     else {
101         // FINDSERVICE
102         record = service_set->findService(serviceName.c_str());
103     }
104     
105     if (record != nullptr) {
106         // Allocate a service handle
107         handle_t handle = allocateServiceHandle(record);
108         std::vector<char> rp_buf;
109         rp_buf.reserve(7);
110         rp_buf.push_back(DINIT_RP_SERVICERECORD);
111         rp_buf.push_back(static_cast<char>(record->getState()));
112         for (int i = 0; i < (int) sizeof(handle); i++) {
113             rp_buf.push_back(*(((char *) &handle) + i));
114         }
115         rp_buf.push_back(static_cast<char>(record->getTargetState()));
116         if (! queuePacket(std::move(rp_buf))) return false;
117     }
118     else {
119         std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
120         if (! queuePacket(std::move(rp_buf))) return false;
121     }
122     
123     // Clear the packet from the buffer
124     rbuf.consume(chklen);
125     chklen = 0;
126     return true;
127 }
128
129 bool ControlConn::processStartStop(int pktType)
130 {
131     using std::string;
132     
133     constexpr int pkt_size = 2 + sizeof(handle_t);
134     
135     if (rbuf.get_length() < pkt_size) {
136         chklen = pkt_size;
137         return true;
138     }
139     
140     // 1 byte: packet type
141     // 1 byte: pin in requested state (0 = no pin, 1 = pin)
142     // 4 bytes: service handle
143     
144     bool do_pin = (rbuf[1] == 1);
145     handle_t handle;
146     rbuf.extract((char *) &handle, 2, sizeof(handle));
147     
148     ServiceRecord *service = findServiceForKey(handle);
149     if (service == nullptr) {
150         // Service handle is bad
151         char badreqRep[] = { DINIT_RP_BADREQ };
152         if (! queuePacket(badreqRep, 1)) return false;
153         bad_conn_close = true;
154         iob.setWatchFlags(OUT_EVENTS);
155         return true;
156     }
157     else {
158         bool already_there = false;
159         switch (pktType) {
160         case DINIT_CP_STARTSERVICE:
161             // start service, mark as required
162             if (do_pin) service->pinStart();
163             service->start();
164             service_set->processQueues(true);
165             already_there = service->getState() == ServiceState::STARTED;
166             break;
167         case DINIT_CP_STOPSERVICE:
168             // force service to stop
169             if (do_pin) service->pinStop();
170             service->stop(true);
171             service->forceStop();
172             service_set->processQueues(false);
173             already_there = service->getState() == ServiceState::STOPPED;
174             break;
175         case DINIT_CP_WAKESERVICE:
176             // re-start a stopped service (do not mark as required)
177             if (do_pin) service->pinStart();
178             service->start(false);
179             service_set->processQueues(true);
180             already_there = service->getState() == ServiceState::STARTED;
181             break;
182         case DINIT_CP_RELEASESERVICE:
183             // remove required mark, stop if not required by dependents
184             if (do_pin) service->pinStop();
185             service->stop();
186             service_set->processQueues(false);
187             already_there = service->getState() == ServiceState::STOPPED;
188             break;
189         default:
190             // TODO return an error
191             break;
192         }
193         
194         char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
195         
196         if (! queuePacket(ack_buf, 1)) return false;
197     }
198     
199     // Clear the packet from the buffer
200     rbuf.consume(pkt_size);
201     chklen = 0;
202     return true;
203 }
204
205 bool ControlConn::processUnpinService()
206 {
207     using std::string;
208     
209     constexpr int pkt_size = 1 + sizeof(handle_t);
210     
211     if (rbuf.get_length() < pkt_size) {
212         chklen = pkt_size;
213         return true;
214     }
215     
216     // 1 byte: packet type
217     // 4 bytes: service handle
218     
219     handle_t handle;
220     rbuf.extract((char *) &handle, 1, sizeof(handle));
221     
222     ServiceRecord *service = findServiceForKey(handle);
223     if (service == nullptr) {
224         // Service handle is bad
225         char badreqRep[] = { DINIT_RP_BADREQ };
226         if (! queuePacket(badreqRep, 1)) return false;
227         bad_conn_close = true;
228         iob.setWatchFlags(OUT_EVENTS);
229         return true;
230     }
231     else {
232         service->unpin();
233         service_set->processQueues(true);
234         char ack_buf[] = { (char) DINIT_RP_ACK };
235         if (! queuePacket(ack_buf, 1)) return false;
236     }
237     
238     // Clear the packet from the buffer
239     rbuf.consume(pkt_size);
240     chklen = 0;
241     return true;
242 }
243
244 ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
245 {
246     bool is_unique = true;
247     handle_t largest_seen = 0;
248     handle_t candidate = 0;
249     for (auto p : keyServiceMap) {
250         if (p.first > largest_seen) largest_seen = p.first;
251         if (p.first == candidate) {
252             if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
253             candidate = largest_seen + 1;
254         }
255         is_unique &= (p.second != record);
256     }
257     
258     keyServiceMap[candidate] = record;
259     serviceKeyMap.insert(std::make_pair(record, candidate));
260     
261     if (is_unique) {
262         record->addListener(this);
263     }
264     
265     return candidate;
266 }
267
268
269 bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
270 {
271     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
272     bool was_empty = outbuf.empty();
273
274     // If the queue is empty, we can try to write the packet out now rather than queueing it.
275     // If the write is unsuccessful or partial, we queue the remainder.
276     if (was_empty) {
277         int wr = write(iob.fd, pkt, size);
278         if (wr == -1) {
279             if (errno == EPIPE) {
280                 return false;
281             }
282             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
283                 // TODO log error
284                 return false;
285             }
286             // EAGAIN etc: fall through to below
287         }
288         else {
289             if ((unsigned)wr == size) {
290                 // Ok, all written.
291                 iob.setWatchFlags(in_flag);
292                 return true;
293             }
294             pkt += wr;
295             size -= wr;
296         }
297     }
298     
299     // Create a vector out of the (remaining part of the) packet:
300     try {
301         outbuf.emplace_back(pkt, pkt + size);
302         iob.setWatchFlags(in_flag | OUT_EVENTS);
303         return true;
304     }
305     catch (std::bad_alloc &baexc) {
306         // Mark the connection bad, and stop reading further requests
307         bad_conn_close = true;
308         oom_close = true;
309         if (was_empty) {
310             // We can't send out-of-memory response as we already wrote as much as we
311             // could above. Neither can we later send the response since we have currently
312             // sent an incomplete packet. All we can do is close the connection.
313             return false;
314         }
315         else {
316             iob.setWatchFlags(OUT_EVENTS);
317             return true;
318         }
319     }
320 }
321
322 // This queuePacket method is frustratingly similar to the one above, but the subtle differences
323 // make them extraordinary difficult to combine into a single method.
324 bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
325 {
326     int in_flag = bad_conn_close ? 0 : IN_EVENTS;
327     bool was_empty = outbuf.empty();
328     
329     if (was_empty) {
330         outpkt_index = 0;
331         // We can try sending the packet immediately:
332         int wr = write(iob.fd, pkt.data(), pkt.size());
333         if (wr == -1) {
334             if (errno == EPIPE) {
335                 return false;
336             }
337             if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
338                 // TODO log error
339                 return false;
340             }
341             // EAGAIN etc: fall through to below
342         }
343         else {
344             if ((unsigned)wr == pkt.size()) {
345                 // Ok, all written.
346                 iob.setWatchFlags(in_flag);
347                 return true;
348             }
349             outpkt_index = wr;
350         }
351     }
352     
353     try {
354         outbuf.emplace_back(pkt);
355         iob.setWatchFlags(in_flag | OUT_EVENTS);
356         return true;
357     }
358     catch (std::bad_alloc &baexc) {
359         // Mark the connection bad, and stop reading further requests
360         bad_conn_close = true;
361         oom_close = true;
362         if (was_empty) {
363             // We can't send out-of-memory response as we already wrote as much as we
364             // could above. Neither can we later send the response since we have currently
365             // sent an incomplete packet. All we can do is close the connection.
366             return false;
367         }
368         else {
369             iob.setWatchFlags(OUT_EVENTS);
370             return true;
371         }
372     }
373 }
374
375 bool ControlConn::rollbackComplete() noexcept
376 {
377     char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
378     return queuePacket(ackBuf, 2);
379 }
380
381 bool ControlConn::dataReady() noexcept
382 {
383     int fd = iob.fd;
384     
385     int r = rbuf.fill(fd);
386     
387     // Note file descriptor is non-blocking
388     if (r == -1) {
389         if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
390             // TODO log error
391             return true;
392         }
393         return false;
394     }
395     
396     if (r == 0) {
397         return true;
398     }
399     
400     // complete packet?
401     if (rbuf.get_length() >= chklen) {
402         try {
403             return !processPacket();
404         }
405         catch (std::bad_alloc &baexc) {
406             doOomClose();
407             return false;
408         }
409     }
410     else if (rbuf.get_length() == 1024) {
411         // Too big packet
412         // TODO log error?
413         // TODO error response?
414         bad_conn_close = true;
415         iob.setWatchFlags(OUT_EVENTS);
416     }
417     else {
418         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
419         iob.setWatchFlags(IN_EVENTS | out_flags);
420     }
421     
422     return false;
423 }
424
425 bool ControlConn::sendData() noexcept
426 {
427     if (outbuf.empty() && bad_conn_close) {
428         if (oom_close) {
429             // Send oom response
430             char oomBuf[] = { DINIT_RP_OOM };
431             write(iob.fd, oomBuf, 1);
432         }
433         return true;
434     }
435     
436     vector<char> & pkt = outbuf.front();
437     char *data = pkt.data();
438     int written = write(iob.fd, data + outpkt_index, pkt.size() - outpkt_index);
439     if (written == -1) {
440         if (errno == EPIPE) {
441             // read end closed
442             return true;
443         }
444         else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
445             // spurious readiness notification?
446         }
447         else {
448             log(LogLevel::ERROR, "Error writing to control connection: ", strerror(errno));
449             return true;
450         }
451         return false;
452     }
453
454     outpkt_index += written;
455     if (outpkt_index == pkt.size()) {
456         // We've finished this packet, move on to the next:
457         outbuf.pop_front();
458         outpkt_index = 0;
459         if (outbuf.empty() && ! oom_close) {
460             if (! bad_conn_close) {
461                 iob.setWatchFlags(IN_EVENTS);
462             }
463             else {
464                 return true;
465             }
466         }
467     }
468     
469     return false;
470 }
471
472 ControlConn::~ControlConn() noexcept
473 {
474     close(iob.fd);
475     iob.deregisterWatch(loop);
476     
477     // Clear service listeners
478     for (auto p : serviceKeyMap) {
479         p.first->removeListener(this);
480     }
481     
482     active_control_conns--;
483 }