Rip out libev, replace with dasynq (new library written for the purpose).
[oweals/dinit.git] / src / control.h
1 #ifndef DINIT_CONTROL_H
2 #define DINIT_CONTROL_H
3
4 #include <list>
5 #include <vector>
6 #include <unordered_map>
7 #include <limits>
8 #include <cstddef>
9
10 #include <unistd.h>
11
12 #include "dasync.h"
13
14 #include "dinit-log.h"
15 #include "control-cmds.h"
16 #include "service-listener.h"
17 #include "cpbuffer.h"
18
19 // Control connection for dinit
20
21 using namespace dasync;
22 using EventLoop_t = EventLoop<NullMutex>;
23
24 // TODO: Use the input buffer as a circular buffer, instead of chomping data from
25 // the front using a data move.
26
27 class ControlConn;
28 class ControlConnWatcher;
29
30 // forward-declaration of callback:
31 static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
32
33 // Pointer to the control connection that is listening for rollback completion
34 extern ControlConn * rollback_handler_conn;
35
36 extern int active_control_conns;
37
38 // "packet" format:
39 // (1 byte) packet type
40 // (N bytes) additional data (service name, etc)
41 //   for LOADSERVICE/FINDSERVICE:
42 //      (2 bytes) service name length
43 //      (M bytes) service name (without nul terminator)
44
45 // Information packet:
46 // (1 byte) packet type, >= 100
47 // (1 byte) packet length (including all fields)
48 //       N bytes: packet data (N = (length - 2))
49
50 class ServiceSet;
51 class ServiceRecord;
52
53 class ControlConnWatcher : public PosixFdWatcher<NullMutex>
54 {
55     Rearm gotEvent(EventLoop_t * loop, int fd, int flags) override
56     {
57         control_conn_cb(loop, this, flags);
58         return Rearm::REARM;
59     }
60     
61     public:
62     int fd; // TODO this is already stored, find a better way to access it.
63     
64     using PosixFdWatcher<NullMutex>::setWatchFlags;
65     
66     void registerWith(EventLoop_t *loop, int fd, int flags)
67     {
68         this->fd = fd;
69         PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
70     }
71 };
72
73 class ControlConn : private ServiceListener
74 {
75     friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
76     
77     ControlConnWatcher iob;
78     EventLoop_t *loop;
79     ServiceSet *service_set;
80     
81     bool bad_conn_close = false; // close when finished output?
82     bool oom_close = false;      // send final 'out of memory' indicator
83
84     // The packet length before we need to re-check if the packet is complete.
85     // processPacket() will not be called until the packet reaches this size.
86     int chklen;
87     
88     // Receive buffer
89     CPBuffer<1024> rbuf;
90     
91     template <typename T> using list = std::list<T>;
92     template <typename T> using vector = std::vector<T>;
93     
94     // A mapping between service records and their associated numerical identifier used
95     // in communction
96     using handle_t = uint32_t;
97     std::unordered_multimap<ServiceRecord *, handle_t> serviceKeyMap;
98     std::unordered_map<handle_t, ServiceRecord *> keyServiceMap;
99     
100     // Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
101     list<vector<char>> outbuf;
102     // Current index within the first outgoing packet (all previous bytes have been sent).
103     unsigned outpkt_index = 0;
104     
105     // Queue a packet to be sent
106     //  Returns:  true if the packet was successfully queued, false if otherwise
107     //            (eg if out of memory); in the latter case the connection might
108     //            no longer be valid (iff there are no outgoing packets queued).
109     bool queuePacket(vector<char> &&v) noexcept;
110     bool queuePacket(const char *pkt, unsigned size) noexcept;
111
112     // Process a packet. Can cause the ControlConn to be deleted iff there are no
113     // outgoing packets queued.
114     // Throws:
115     //    std::bad_alloc - if an out-of-memory condition prevents processing
116     void processPacket();
117     
118     // Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
119     void processStartStop(int pktType);
120     
121     // Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
122     void processFindLoad(int pktType);
123
124     // Process an UNPINSERVICE packet. May throw std::bad_alloc.
125     void processUnpinService();
126
127     // Notify that data is ready to be read from the socket. Returns true in cases where the
128     // connection was deleted with potentially pending outgoing packets.
129     bool dataReady() noexcept;
130     
131     void sendData() noexcept;
132     
133     // Allocate a new handle for a service; may throw std::bad_alloc
134     handle_t allocateServiceHandle(ServiceRecord *record);
135     
136     ServiceRecord *findServiceForKey(uint32_t key)
137     {
138         try {
139             return keyServiceMap.at(key);
140         }
141         catch (std::out_of_range &exc) {
142             return nullptr;
143         }
144     }
145     
146     // Close connection due to out-of-memory condition.
147     void doOomClose()
148     {
149         bad_conn_close = true;
150         oom_close = true;
151         iob.setWatchFlags(out_events);
152         //ev_io_set(&iob, iob.fd, EV_WRITE);
153     }
154     
155     // Process service event broadcast.
156     void serviceEvent(ServiceRecord * service, ServiceEvent event) noexcept final override
157     {
158         // For each service handle corresponding to the event, send an information packet.
159         auto range = serviceKeyMap.equal_range(service);
160         auto & i = range.first;
161         auto & end = range.second;
162         try {
163             while (i != end) {
164                 uint32_t key = i->second;
165                 std::vector<char> pkt;
166                 constexpr int pktsize = 3 + sizeof(key);
167                 pkt.reserve(pktsize);
168                 pkt.push_back(DINIT_IP_SERVICEEVENT);
169                 pkt.push_back(pktsize);
170                 char * p = (char *) &key;
171                 for (int j = 0; j < (int)sizeof(key); j++) {
172                     pkt.push_back(*p++);
173                 }
174                 pkt.push_back(static_cast<char>(event));
175                 queuePacket(std::move(pkt));
176                 ++i;
177             }
178         }
179         catch (std::bad_alloc &exc) {
180             doOomClose();
181         }
182     }
183     
184     public:
185     ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
186     {
187         //ev_io_init(&iob, control_conn_cb, fd, EV_READ);
188         //iob.data = this;
189         //ev_io_start(loop, &iob);
190         iob.registerWith(loop, fd, in_events);
191         
192         active_control_conns++;
193     }
194     
195     bool rollbackComplete() noexcept;
196         
197     virtual ~ControlConn() noexcept;
198 };
199
200
201 static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
202 {
203     char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
204     ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
205     if (revents & in_events) {
206         if (conn->dataReady()) {
207             // ControlConn was deleted
208             return;
209         }
210     }
211     if (revents & out_events) {
212         conn->sendData();
213     }    
214 }
215
216 #endif