Use a BidiFdWatcher for control connections. One step closer to
[oweals/dinit.git] / src / control.h
1 #ifndef DINIT_CONTROL_H
2 #define DINIT_CONTROL_H
3
4 #include <list>
5 #include <vector>
6 #include <unordered_map>
7 #include <limits>
8 #include <cstddef>
9
10 #include <unistd.h>
11
12 #include "dasync.h"
13
14 #include "dinit-log.h"
15 #include "control-cmds.h"
16 #include "service-listener.h"
17 #include "cpbuffer.h"
18
19 // Control connection for dinit
20
21 using namespace dasync;
22 using EventLoop_t = EventLoop<NullMutex>;
23
24 class ControlConn;
25 class ControlConnWatcher;
26
27 // forward-declaration of callback:
28 static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
29
30 // Pointer to the control connection that is listening for rollback completion
31 extern ControlConn * rollback_handler_conn;
32
33 extern int active_control_conns;
34
35 // "packet" format:
36 // (1 byte) packet type
37 // (N bytes) additional data (service name, etc)
38 //   for LOADSERVICE/FINDSERVICE:
39 //      (2 bytes) service name length
40 //      (M bytes) service name (without nul terminator)
41
42 // Information packet:
43 // (1 byte) packet type, >= 100
44 // (1 byte) packet length (including all fields)
45 //       N bytes: packet data (N = (length - 2))
46
47 class ServiceSet;
48 class ServiceRecord;
49
50 class ControlConnWatcher : public PosixBidiFdWatcher<NullMutex>
51 {
52     inline Rearm receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept;
53
54     Rearm readReady(EventLoop_t * loop, int fd) noexcept override
55     {
56         return receiveEvent(loop, fd, in_events);
57     }
58     
59     Rearm writeReady(EventLoop_t * loop, int fd) noexcept override
60     {
61         return receiveEvent(loop, fd, out_events);
62     }
63     
64     public:
65     int fd; // TODO this is already stored, find a better way to access it.
66     EventLoop_t * eventLoop;
67     int watch_flags;
68     
69     void setWatchFlags(int flags) noexcept
70     {
71         PosixBidiFdWatcher<NullMutex>::setWatchFlags(eventLoop, flags);
72         watch_flags = flags;
73     }
74     
75     void registerWith(EventLoop_t *loop, int fd, int flags)
76     {
77         this->fd = fd;
78         PosixBidiFdWatcher<NullMutex>::registerWith(loop, fd, flags);
79         watch_flags = flags;
80     }
81 };
82
83 inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept
84 {
85     PosixBidiFdWatcher<NullMutex>::setWatchFlags(loop, watch_flags);
86     control_conn_cb(loop, this, flags);
87     return Rearm::NOOP;
88 }
89
90
91 class ControlConn : private ServiceListener
92 {
93     friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
94     
95     ControlConnWatcher iob;
96     EventLoop_t *loop;
97     ServiceSet *service_set;
98     
99     bool bad_conn_close = false; // close when finished output?
100     bool oom_close = false;      // send final 'out of memory' indicator
101
102     // The packet length before we need to re-check if the packet is complete.
103     // processPacket() will not be called until the packet reaches this size.
104     int chklen;
105     
106     // Receive buffer
107     CPBuffer<1024> rbuf;
108     
109     template <typename T> using list = std::list<T>;
110     template <typename T> using vector = std::vector<T>;
111     
112     // A mapping between service records and their associated numerical identifier used
113     // in communction
114     using handle_t = uint32_t;
115     std::unordered_multimap<ServiceRecord *, handle_t> serviceKeyMap;
116     std::unordered_map<handle_t, ServiceRecord *> keyServiceMap;
117     
118     // Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
119     list<vector<char>> outbuf;
120     // Current index within the first outgoing packet (all previous bytes have been sent).
121     unsigned outpkt_index = 0;
122     
123     // Queue a packet to be sent
124     //  Returns:  true if the packet was successfully queued, false if otherwise
125     //            (eg if out of memory); in the latter case the connection might
126     //            no longer be valid (iff there are no outgoing packets queued).
127     bool queuePacket(vector<char> &&v) noexcept;
128     bool queuePacket(const char *pkt, unsigned size) noexcept;
129
130     // Process a packet. Can cause the ControlConn to be deleted iff there are no
131     // outgoing packets queued.
132     // Throws:
133     //    std::bad_alloc - if an out-of-memory condition prevents processing
134     void processPacket();
135     
136     // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
137     void processStartStop(int pktType);
138     
139     // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
140     void processFindLoad(int pktType);
141
142     // Process an UNPINSERVICE packet. May throw std::bad_alloc.
143     void processUnpinService();
144
145     // Notify that data is ready to be read from the socket. Returns true in cases where the
146     // connection was deleted with potentially pending outgoing packets.
147     bool dataReady() noexcept;
148     
149     void sendData() noexcept;
150     
151     // Allocate a new handle for a service; may throw std::bad_alloc
152     handle_t allocateServiceHandle(ServiceRecord *record);
153     
154     ServiceRecord *findServiceForKey(uint32_t key)
155     {
156         try {
157             return keyServiceMap.at(key);
158         }
159         catch (std::out_of_range &exc) {
160             return nullptr;
161         }
162     }
163     
164     // Close connection due to out-of-memory condition.
165     void doOomClose()
166     {
167         bad_conn_close = true;
168         oom_close = true;
169         iob.setWatchFlags(out_events);
170     }
171     
172     // Process service event broadcast.
173     void serviceEvent(ServiceRecord * service, ServiceEvent event) noexcept final override
174     {
175         // For each service handle corresponding to the event, send an information packet.
176         auto range = serviceKeyMap.equal_range(service);
177         auto & i = range.first;
178         auto & end = range.second;
179         try {
180             while (i != end) {
181                 uint32_t key = i->second;
182                 std::vector<char> pkt;
183                 constexpr int pktsize = 3 + sizeof(key);
184                 pkt.reserve(pktsize);
185                 pkt.push_back(DINIT_IP_SERVICEEVENT);
186                 pkt.push_back(pktsize);
187                 char * p = (char *) &key;
188                 for (int j = 0; j < (int)sizeof(key); j++) {
189                     pkt.push_back(*p++);
190                 }
191                 pkt.push_back(static_cast<char>(event));
192                 queuePacket(std::move(pkt));
193                 ++i;
194             }
195         }
196         catch (std::bad_alloc &exc) {
197             doOomClose();
198         }
199     }
200     
201     public:
202     ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
203     {
204         iob.registerWith(loop, fd, in_events);
205         active_control_conns++;
206     }
207     
208     bool rollbackComplete() noexcept;
209         
210     virtual ~ControlConn() noexcept;
211 };
212
213
214 static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
215 {
216     char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
217     ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
218     if (revents & in_events) {
219         if (conn->dataReady()) {
220             // ControlConn was deleted
221             return;
222         }
223     }
224     if (revents & out_events) {
225         conn->sendData();
226     }    
227 }
228
229 #endif