3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
34 #include "common/c_internal.h"
36 /******************************************************************************/
37 AsyncEngine::~AsyncEngine()
40 // Request all threads to stop
41 for (AsyncWorkerThread *workerThread : workerThreads) {
46 // Wake up all threads
47 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
48 it != workerThreads.end(); ++it) {
49 jobQueueCounter.post();
52 // Wait for threads to finish
53 for (AsyncWorkerThread *workerThread : workerThreads) {
57 // Force kill all threads
58 for (AsyncWorkerThread *workerThread : workerThreads) {
64 jobQueueMutex.unlock();
65 workerThreads.clear();
68 /******************************************************************************/
69 void AsyncEngine::registerStateInitializer(StateInitializer func)
71 stateInitializers.push_back(func);
74 /******************************************************************************/
75 void AsyncEngine::initialize(unsigned int numEngines)
79 for (unsigned int i = 0; i < numEngines; i++) {
80 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
81 std::string("AsyncWorker-") + itos(i));
82 workerThreads.push_back(toAdd);
87 /******************************************************************************/
88 unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
89 const std::string ¶ms)
93 toAdd.id = jobIdCounter++;
94 toAdd.serializedFunction = func;
95 toAdd.serializedParams = params;
97 jobQueue.push_back(toAdd);
99 jobQueueCounter.post();
101 jobQueueMutex.unlock();
106 /******************************************************************************/
107 LuaJobInfo AsyncEngine::getJob()
109 jobQueueCounter.wait();
110 jobQueueMutex.lock();
114 if (!jobQueue.empty()) {
115 retval = jobQueue.front();
116 jobQueue.pop_front();
119 jobQueueMutex.unlock();
124 /******************************************************************************/
125 void AsyncEngine::putJobResult(const LuaJobInfo &result)
127 resultQueueMutex.lock();
128 resultQueue.push_back(result);
129 resultQueueMutex.unlock();
132 /******************************************************************************/
133 void AsyncEngine::step(lua_State *L)
135 int error_handler = PUSH_ERROR_HANDLER(L);
136 lua_getglobal(L, "core");
137 resultQueueMutex.lock();
138 while (!resultQueue.empty()) {
139 LuaJobInfo jobDone = resultQueue.front();
140 resultQueue.pop_front();
142 lua_getfield(L, -1, "async_event_handler");
144 if (lua_isnil(L, -1)) {
145 FATAL_ERROR("Async event handler does not exist!");
148 luaL_checktype(L, -1, LUA_TFUNCTION);
150 lua_pushinteger(L, jobDone.id);
151 lua_pushlstring(L, jobDone.serializedResult.data(),
152 jobDone.serializedResult.size());
154 PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
156 resultQueueMutex.unlock();
157 lua_pop(L, 2); // Pop core and error handler
160 /******************************************************************************/
161 void AsyncEngine::pushFinishedJobs(lua_State* L) {
163 MutexAutoLock l(resultQueueMutex);
165 unsigned int index = 1;
166 lua_createtable(L, resultQueue.size(), 0);
167 int top = lua_gettop(L);
169 while (!resultQueue.empty()) {
170 LuaJobInfo jobDone = resultQueue.front();
171 resultQueue.pop_front();
173 lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
174 int top_lvl2 = lua_gettop(L);
176 lua_pushstring(L, "jobid");
177 lua_pushnumber(L, jobDone.id);
178 lua_settable(L, top_lvl2);
180 lua_pushstring(L, "retval");
181 lua_pushlstring(L, jobDone.serializedResult.data(),
182 jobDone.serializedResult.size());
183 lua_settable(L, top_lvl2);
185 lua_rawseti(L, top, index++);
189 /******************************************************************************/
190 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
192 for (StateInitializer &stateInitializer : stateInitializers) {
193 stateInitializer(L, top);
197 /******************************************************************************/
198 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
199 const std::string &name) :
201 ScriptApiBase(ScriptingType::Async),
202 jobDispatcher(jobDispatcher)
204 lua_State *L = getStack();
206 // Prepare job lua environment
207 lua_getglobal(L, "core");
208 int top = lua_gettop(L);
210 // Push builtin initialization type
211 lua_pushstring(L, "async");
212 lua_setglobal(L, "INIT");
214 jobDispatcher->prepareEnvironment(L, top);
217 /******************************************************************************/
218 AsyncWorkerThread::~AsyncWorkerThread()
220 sanity_check(!isRunning());
223 /******************************************************************************/
224 void* AsyncWorkerThread::run()
226 lua_State *L = getStack();
228 std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
231 } catch (const ModError &e) {
232 errorstream << "Execution of async base environment failed: "
233 << e.what() << std::endl;
234 FATAL_ERROR("Execution of async base environment failed");
237 int error_handler = PUSH_ERROR_HANDLER(L);
239 lua_getglobal(L, "core");
240 if (lua_isnil(L, -1)) {
241 FATAL_ERROR("Unable to find core within async environment!");
245 while (!stopRequested()) {
247 LuaJobInfo toProcess = jobDispatcher->getJob();
249 if (!toProcess.valid || stopRequested()) {
253 lua_getfield(L, -1, "job_processor");
254 if (lua_isnil(L, -1)) {
255 FATAL_ERROR("Unable to get async job processor!");
258 luaL_checktype(L, -1, LUA_TFUNCTION);
262 toProcess.serializedFunction.data(),
263 toProcess.serializedFunction.size());
265 toProcess.serializedParams.data(),
266 toProcess.serializedParams.size());
268 int result = lua_pcall(L, 2, 1, error_handler);
271 toProcess.serializedResult = "";
275 const char *retval = lua_tolstring(L, -1, &length);
276 toProcess.serializedResult = std::string(retval, length);
279 lua_pop(L, 1); // Pop retval
282 jobDispatcher->putJobResult(toProcess);
285 lua_pop(L, 2); // Pop core and error handler