Add core.remove_detached_inventory (#7684)
[oweals/minetest.git] / src / script / cpp_api / s_async.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <cstdio>
21 #include <cstdlib>
22
23 extern "C" {
24 #include "lua.h"
25 #include "lauxlib.h"
26 #include "lualib.h"
27 }
28
29 #include "server.h"
30 #include "s_async.h"
31 #include "log.h"
32 #include "filesys.h"
33 #include "porting.h"
34 #include "common/c_internal.h"
35
36 /******************************************************************************/
37 AsyncEngine::~AsyncEngine()
38 {
39
40         // Request all threads to stop
41         for (AsyncWorkerThread *workerThread : workerThreads) {
42                 workerThread->stop();
43         }
44
45
46         // Wake up all threads
47         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
48                         it != workerThreads.end(); ++it) {
49                 jobQueueCounter.post();
50         }
51
52         // Wait for threads to finish
53         for (AsyncWorkerThread *workerThread : workerThreads) {
54                 workerThread->wait();
55         }
56
57         // Force kill all threads
58         for (AsyncWorkerThread *workerThread : workerThreads) {
59                 delete workerThread;
60         }
61
62         jobQueueMutex.lock();
63         jobQueue.clear();
64         jobQueueMutex.unlock();
65         workerThreads.clear();
66 }
67
68 /******************************************************************************/
69 void AsyncEngine::registerStateInitializer(StateInitializer func)
70 {
71         stateInitializers.push_back(func);
72 }
73
74 /******************************************************************************/
75 void AsyncEngine::initialize(unsigned int numEngines)
76 {
77         initDone = true;
78
79         for (unsigned int i = 0; i < numEngines; i++) {
80                 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
81                         std::string("AsyncWorker-") + itos(i));
82                 workerThreads.push_back(toAdd);
83                 toAdd->start();
84         }
85 }
86
87 /******************************************************************************/
88 unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
89                 const std::string &params)
90 {
91         jobQueueMutex.lock();
92         LuaJobInfo toAdd;
93         toAdd.id = jobIdCounter++;
94         toAdd.serializedFunction = func;
95         toAdd.serializedParams = params;
96
97         jobQueue.push_back(toAdd);
98
99         jobQueueCounter.post();
100
101         jobQueueMutex.unlock();
102
103         return toAdd.id;
104 }
105
106 /******************************************************************************/
107 LuaJobInfo AsyncEngine::getJob()
108 {
109         jobQueueCounter.wait();
110         jobQueueMutex.lock();
111
112         LuaJobInfo retval;
113
114         if (!jobQueue.empty()) {
115                 retval = jobQueue.front();
116                 jobQueue.pop_front();
117                 retval.valid = true;
118         }
119         jobQueueMutex.unlock();
120
121         return retval;
122 }
123
124 /******************************************************************************/
125 void AsyncEngine::putJobResult(const LuaJobInfo &result)
126 {
127         resultQueueMutex.lock();
128         resultQueue.push_back(result);
129         resultQueueMutex.unlock();
130 }
131
132 /******************************************************************************/
133 void AsyncEngine::step(lua_State *L)
134 {
135         int error_handler = PUSH_ERROR_HANDLER(L);
136         lua_getglobal(L, "core");
137         resultQueueMutex.lock();
138         while (!resultQueue.empty()) {
139                 LuaJobInfo jobDone = resultQueue.front();
140                 resultQueue.pop_front();
141
142                 lua_getfield(L, -1, "async_event_handler");
143
144                 if (lua_isnil(L, -1)) {
145                         FATAL_ERROR("Async event handler does not exist!");
146                 }
147
148                 luaL_checktype(L, -1, LUA_TFUNCTION);
149
150                 lua_pushinteger(L, jobDone.id);
151                 lua_pushlstring(L, jobDone.serializedResult.data(),
152                                 jobDone.serializedResult.size());
153
154                 PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
155         }
156         resultQueueMutex.unlock();
157         lua_pop(L, 2); // Pop core and error handler
158 }
159
160 /******************************************************************************/
161 void AsyncEngine::pushFinishedJobs(lua_State* L) {
162         // Result Table
163         MutexAutoLock l(resultQueueMutex);
164
165         unsigned int index = 1;
166         lua_createtable(L, resultQueue.size(), 0);
167         int top = lua_gettop(L);
168
169         while (!resultQueue.empty()) {
170                 LuaJobInfo jobDone = resultQueue.front();
171                 resultQueue.pop_front();
172
173                 lua_createtable(L, 0, 2);  // Pre-allocate space for two map fields
174                 int top_lvl2 = lua_gettop(L);
175
176                 lua_pushstring(L, "jobid");
177                 lua_pushnumber(L, jobDone.id);
178                 lua_settable(L, top_lvl2);
179
180                 lua_pushstring(L, "retval");
181                 lua_pushlstring(L, jobDone.serializedResult.data(),
182                         jobDone.serializedResult.size());
183                 lua_settable(L, top_lvl2);
184
185                 lua_rawseti(L, top, index++);
186         }
187 }
188
189 /******************************************************************************/
190 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
191 {
192         for (StateInitializer &stateInitializer : stateInitializers) {
193                 stateInitializer(L, top);
194         }
195 }
196
197 /******************************************************************************/
198 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
199                 const std::string &name) :
200         Thread(name),
201         ScriptApiBase(ScriptingType::Async),
202         jobDispatcher(jobDispatcher)
203 {
204         lua_State *L = getStack();
205
206         // Prepare job lua environment
207         lua_getglobal(L, "core");
208         int top = lua_gettop(L);
209
210         // Push builtin initialization type
211         lua_pushstring(L, "async");
212         lua_setglobal(L, "INIT");
213
214         jobDispatcher->prepareEnvironment(L, top);
215 }
216
217 /******************************************************************************/
218 AsyncWorkerThread::~AsyncWorkerThread()
219 {
220         sanity_check(!isRunning());
221 }
222
223 /******************************************************************************/
224 void* AsyncWorkerThread::run()
225 {
226         lua_State *L = getStack();
227
228         std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
229         try {
230                 loadScript(script);
231         } catch (const ModError &e) {
232                 errorstream << "Execution of async base environment failed: "
233                         << e.what() << std::endl;
234                 FATAL_ERROR("Execution of async base environment failed");
235         }
236
237         int error_handler = PUSH_ERROR_HANDLER(L);
238
239         lua_getglobal(L, "core");
240         if (lua_isnil(L, -1)) {
241                 FATAL_ERROR("Unable to find core within async environment!");
242         }
243
244         // Main loop
245         while (!stopRequested()) {
246                 // Wait for job
247                 LuaJobInfo toProcess = jobDispatcher->getJob();
248
249                 if (!toProcess.valid || stopRequested()) {
250                         continue;
251                 }
252
253                 lua_getfield(L, -1, "job_processor");
254                 if (lua_isnil(L, -1)) {
255                         FATAL_ERROR("Unable to get async job processor!");
256                 }
257
258                 luaL_checktype(L, -1, LUA_TFUNCTION);
259
260                 // Call it
261                 lua_pushlstring(L,
262                                 toProcess.serializedFunction.data(),
263                                 toProcess.serializedFunction.size());
264                 lua_pushlstring(L,
265                                 toProcess.serializedParams.data(),
266                                 toProcess.serializedParams.size());
267
268                 int result = lua_pcall(L, 2, 1, error_handler);
269                 if (result) {
270                         PCALL_RES(result);
271                         toProcess.serializedResult = "";
272                 } else {
273                         // Fetch result
274                         size_t length;
275                         const char *retval = lua_tolstring(L, -1, &length);
276                         toProcess.serializedResult = std::string(retval, length);
277                 }
278
279                 lua_pop(L, 1);  // Pop retval
280
281                 // Put job result
282                 jobDispatcher->putJobResult(toProcess);
283         }
284
285         lua_pop(L, 2);  // Pop core and error handler
286
287         return 0;
288 }
289