From 8b0b857eaaa50c6ec217a46c0577395c78ec04c7 Mon Sep 17 00:00:00 2001 From: sapier Date: Mon, 6 Jan 2014 12:45:42 +0100 Subject: [PATCH] Make MutexQueue use jsemaphore for signaling --- src/client.cpp | 18 +++- src/client.h | 8 ++ src/connection.cpp | 5 +- src/game.cpp | 32 +++--- src/httpfetch.cpp | 2 +- src/itemdef.cpp | 1 + src/jthread/jsemaphore.h | 1 + src/jthread/pthread/jsemaphore.cpp | 31 ++++++ src/jthread/win32/jsemaphore.cpp | 15 +++ src/shader.cpp | 24 +++-- src/tile.cpp | 1 + src/util/container.h | 151 ++++++++++++++++++++--------- src/util/thread.h | 62 ++++++------ 13 files changed, 250 insertions(+), 101 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index b830bcdf3..721c413c0 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -286,6 +286,20 @@ Client::Client( } } +void Client::Stop() +{ + //request all client managed threads to stop + m_mesh_update_thread.Stop(); +} + +bool Client::isShutdown() +{ + + if (!m_mesh_update_thread.IsRunning()) return true; + + return false; +} + Client::~Client() { { @@ -296,7 +310,7 @@ Client::~Client() m_mesh_update_thread.Stop(); m_mesh_update_thread.Wait(); while(!m_mesh_update_thread.m_queue_out.empty()) { - MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front(); + MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx(); delete r.mesh; } @@ -692,7 +706,7 @@ void Client::step(float dtime) while(!m_mesh_update_thread.m_queue_out.empty()) { num_processed_meshes++; - MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front(); + MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx(); MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p); if(block) { diff --git a/src/client.h b/src/client.h index 1ed80a2b0..1b7ad48e6 100644 --- a/src/client.h +++ b/src/client.h @@ -289,6 +289,14 @@ public: ); ~Client(); + + /* + request all threads managed by client to be stopped + */ + void Stop(); + + + bool isShutdown(); /* The name of the local player should already be set when calling this, as it is sent in the initialization. diff --git a/src/connection.cpp b/src/connection.cpp index 8f83f6219..bc9279649 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -592,8 +592,9 @@ void * Connection::Thread() runTimeouts(dtime); + //NOTE this is only thread safe for ONE consumer thread! while(!m_command_queue.empty()){ - ConnectionCommand c = m_command_queue.pop_front(); + ConnectionCommand c = m_command_queue.pop_frontNoEx(); processCommand(c); } @@ -1556,7 +1557,7 @@ ConnectionEvent Connection::getEvent() e.type = CONNEVENT_NONE; return e; } - return m_event_queue.pop_front(); + return m_event_queue.pop_frontNoEx(); } ConnectionEvent Connection::waitEvent(u32 timeout_ms) diff --git a/src/game.cpp b/src/game.cpp index b751a2b62..aef60484f 100644 --- a/src/game.cpp +++ b/src/game.cpp @@ -813,7 +813,7 @@ public: services->setVertexShaderConstant("animationTimer", &animation_timer_f, 1); LocalPlayer* player = m_client->getEnv().getLocalPlayer(); - v3f eye_position = player->getEyePosition(); + v3f eye_position = player->getEyePosition(); services->setPixelShaderConstant("eyePosition", (irr::f32*)&eye_position, 3); services->setVertexShaderConstant("eyePosition", (irr::f32*)&eye_position, 3); @@ -1876,12 +1876,12 @@ void the_game( } else if(input->wasKeyDown(getKeySetting("keymap_screenshot"))) { - irr::video::IImage* const image = driver->createScreenShot(); - if (image) { - irr::c8 filename[256]; - snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png", + irr::video::IImage* const image = driver->createScreenShot(); + if (image) { + irr::c8 filename[256]; + snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png", g_settings->get("screenshot_path").c_str(), - device->getTimer()->getRealTime()); + device->getTimer()->getRealTime()); if (driver->writeImageToFile(image, filename)) { std::wstringstream sstr; sstr<<"Saved screenshot to '"<drop(); - } + image->drop(); + } } else if(input->wasKeyDown(getKeySetting("keymap_toggle_hud"))) { @@ -2263,7 +2263,7 @@ void the_game( new MainRespawnInitiator( &respawn_menu_active, &client); GUIDeathScreen *menu = - new GUIDeathScreen(guienv, guiroot, -1, + new GUIDeathScreen(guienv, guiroot, -1, &g_menumgr, respawner); menu->drop(); @@ -2755,7 +2755,7 @@ void the_game( // Sign special case, at least until formspec is properly implemented. // Deprecated? - if(meta && meta->getString("formspec") == "hack:sign_text_input" + if(meta && meta->getString("formspec") == "hack:sign_text_input" && !random_input && !input->isKeyDown(getKeySetting("keymap_sneak"))) { @@ -3222,7 +3222,7 @@ void the_game( driver->getOverrideMaterial().Material.ColorMask = irr::video::ECP_RED; driver->getOverrideMaterial().EnableFlags = irr::video::EMF_COLOR_MASK; - driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX + + driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX + irr::scene::ESNRP_SOLID + irr::scene::ESNRP_TRANSPARENT + irr::scene::ESNRP_TRANSPARENT_EFFECT + @@ -3433,6 +3433,16 @@ void the_game( chat_backend.addMessage(L"", L"# Disconnected."); chat_backend.addMessage(L"", L""); + client.Stop(); + + //force answer all texture and shader jobs (TODO return empty values) + + while(!client.isShutdown()) { + tsrc->processQueue(); + shsrc->processQueue(); + sleep_ms(100); + } + // Client scope (client is destructed before destructing *def and tsrc) }while(0); } // try-catch diff --git a/src/httpfetch.cpp b/src/httpfetch.cpp index 9eed045fe..176a3b22a 100644 --- a/src/httpfetch.cpp +++ b/src/httpfetch.cpp @@ -594,7 +594,7 @@ protected: */ while (!m_requests.empty()) { - Request req = m_requests.pop_front(); + Request req = m_requests.pop_frontNoEx(); processRequest(req); } processQueued(&pool); diff --git a/src/itemdef.cpp b/src/itemdef.cpp index f77a198b5..d5e03f7b3 100644 --- a/src/itemdef.cpp +++ b/src/itemdef.cpp @@ -642,6 +642,7 @@ public: void processQueue(IGameDef *gamedef) { #ifndef SERVER + //NOTE this is only thread safe for ONE consumer thread! while(!m_get_clientcached_queue.empty()) { GetRequest diff --git a/src/jthread/jsemaphore.h b/src/jthread/jsemaphore.h index 70318d5da..b62add253 100644 --- a/src/jthread/jsemaphore.h +++ b/src/jthread/jsemaphore.h @@ -36,6 +36,7 @@ public: void Post(); void Wait(); + bool Wait(unsigned int time_ms); int GetValue(); diff --git a/src/jthread/pthread/jsemaphore.cpp b/src/jthread/pthread/jsemaphore.cpp index 962b582f1..ee1431065 100644 --- a/src/jthread/pthread/jsemaphore.cpp +++ b/src/jthread/pthread/jsemaphore.cpp @@ -17,8 +17,12 @@ with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include +#include +#include #include "jthread/jsemaphore.h" + #define UNUSED(expr) do { (void)(expr); } while (0) + JSemaphore::JSemaphore() { int sem_init_retval = sem_init(&m_semaphore,0,0); assert(sem_init_retval == 0); @@ -49,6 +53,33 @@ void JSemaphore::Wait() { UNUSED(sem_wait_retval); } +bool JSemaphore::Wait(unsigned int time_ms) { + struct timespec waittime; + struct timeval now; + + if (gettimeofday(&now, NULL) == -1) { + assert("Unable to get time by clock_gettime!" == 0); + return false; + } + + waittime.tv_nsec = ((time_ms % 1000) * 1000 * 1000) + (now.tv_usec * 1000); + waittime.tv_sec = (time_ms / 1000) + (waittime.tv_nsec / (1000*1000*1000)) + now.tv_sec; + waittime.tv_nsec %= 1000*1000*1000; + + errno = 0; + int sem_wait_retval = sem_timedwait(&m_semaphore,&waittime); + + if (sem_wait_retval == 0) + { + return true; + } + else { + assert((errno == ETIMEDOUT) || (errno == EINTR)); + return false; + } + return sem_wait_retval == 0 ? true : false; +} + int JSemaphore::GetValue() { int retval = 0; diff --git a/src/jthread/win32/jsemaphore.cpp b/src/jthread/win32/jsemaphore.cpp index 3a1f2715c..34167f391 100755 --- a/src/jthread/win32/jsemaphore.cpp +++ b/src/jthread/win32/jsemaphore.cpp @@ -51,6 +51,21 @@ void JSemaphore::Wait() { INFINITE); } +bool JSemaphore::Wait(unsigned int time_ms) { + unsigned int retval = WaitForSingleObject( + m_hSemaphore, + time_ms); + + if (retval == WAIT_OBJECT_0) + { + return true; + } + else { + assert(retval == WAIT_TIMEOUT); + return false; + } +} + int JSemaphore::GetValue() { long int retval = 0; diff --git a/src/shader.cpp b/src/shader.cpp index 39296f6a3..d29c9d3a7 100644 --- a/src/shader.cpp +++ b/src/shader.cpp @@ -427,21 +427,18 @@ u32 ShaderSource::getShaderId(const std::string &name) /* infostream<<"Waiting for shader from main thread, name=\"" < - result = result_queue.pop_front(1000); - - if (result.key == name) { - return result.item; - } + while(true) { + GetResult + result = result_queue.pop_frontNoEx(); + + if (result.key == name) { + return result.item; + } + else { + errorstream << "Got shader with invalid name: " << result.key << std::endl; } } - catch(ItemNotFoundException &e){ - errorstream<<"Waiting for shader " << name << " timed out."< request = m_get_shader_queue.pop(); diff --git a/src/tile.cpp b/src/tile.cpp index e003c3020..b8080c708 100644 --- a/src/tile.cpp +++ b/src/tile.cpp @@ -775,6 +775,7 @@ void TextureSource::processQueue() /* Fetch textures */ + //NOTE this is only thread safe for ONE consumer thread! if(!m_get_texture_queue.empty()) { GetRequest diff --git a/src/util/container.h b/src/util/container.h index e83c3cd37..6d836a4d5 100644 --- a/src/util/container.h +++ b/src/util/container.h @@ -24,7 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "../exceptions.h" #include "../jthread/jmutex.h" #include "../jthread/jmutexautolock.h" -#include "../porting.h" // For sleep_ms +#include "../jthread/jsemaphore.h" #include #include #include @@ -201,6 +201,12 @@ public: ++m_list_size; } + void push_front(T t) + { + m_list.push_front(t); + ++m_list_size; + } + T pop_front() { if(m_list.empty()) @@ -247,86 +253,141 @@ template class MutexedQueue { public: + template + friend class RequestQueue; + MutexedQueue() { } bool empty() { JMutexAutoLock lock(m_mutex); - return m_list.empty(); + return (m_size.GetValue() == 0); } void push_back(T t) { JMutexAutoLock lock(m_mutex); m_list.push_back(t); + m_size.Post(); } - T pop_front(u32 wait_time_max_ms=0) + + /* this version of pop_front returns a empty element of T on timeout. + * Make sure default constructor of T creates a recognizable "empty" element + */ + T pop_frontNoEx(u32 wait_time_max_ms) { - u32 wait_time_ms = 0; + if (m_size.Wait(wait_time_max_ms)) + { + JMutexAutoLock lock(m_mutex); - for(;;) + typename std::list::iterator begin = m_list.begin(); + T t = *begin; + m_list.erase(begin); + return t; + } + else { - { - JMutexAutoLock lock(m_mutex); - - if(!m_list.empty()) - { - typename std::list::iterator begin = m_list.begin(); - T t = *begin; - m_list.erase(begin); - return t; - } - - if(wait_time_ms >= wait_time_max_ms) - throw ItemNotFoundException("MutexedQueue: queue is empty"); - } - - // Wait a while before trying again - sleep_ms(10); - wait_time_ms += 10; + return T(); } } + + T pop_front(u32 wait_time_max_ms) + { + if (m_size.Wait(wait_time_max_ms)) + { + JMutexAutoLock lock(m_mutex); + + typename std::list::iterator begin = m_list.begin(); + T t = *begin; + m_list.erase(begin); + return t; + } + else + { + throw ItemNotFoundException("MutexedQueue: queue is empty"); + } + } + + T pop_frontNoEx() + { + m_size.Wait(); + + JMutexAutoLock lock(m_mutex); + + typename std::list::iterator begin = m_list.begin(); + T t = *begin; + m_list.erase(begin); + return t; + } + T pop_back(u32 wait_time_max_ms=0) { - u32 wait_time_ms = 0; + if (m_size.Wait(wait_time_max_ms)) + { + JMutexAutoLock lock(m_mutex); + + typename std::list::iterator last = m_list.end(); + last--; + T t = *last; + m_list.erase(last); + return t; + } + else + { + throw ItemNotFoundException("MutexedQueue: queue is empty"); + } + } + + /* this version of pop_back returns a empty element of T on timeout. + * Make sure default constructor of T creates a recognizable "empty" element + */ + T pop_backNoEx(u32 wait_time_max_ms=0) + { + if (m_size.Wait(wait_time_max_ms)) + { + JMutexAutoLock lock(m_mutex); - for(;;) + typename std::list::iterator last = m_list.end(); + last--; + T t = *last; + m_list.erase(last); + return t; + } + else { - { - JMutexAutoLock lock(m_mutex); - - if(!m_list.empty()) - { - typename std::list::iterator last = m_list.end(); - last--; - T t = *last; - m_list.erase(last); - return t; - } - - if(wait_time_ms >= wait_time_max_ms) - throw ItemNotFoundException("MutexedQueue: queue is empty"); - } - - // Wait a while before trying again - sleep_ms(10); - wait_time_ms += 10; + return T(); } } + T pop_backNoEx() + { + m_size.Wait(); + + JMutexAutoLock lock(m_mutex); + + typename std::list::iterator last = m_list.end(); + last--; + T t = *last; + m_list.erase(last); + return t; + } + +protected: JMutex & getMutex() { return m_mutex; } + // NEVER EVER modify the >>list<< you got by using this function! + // You may only modify it's content std::list & getList() { return m_list; } -protected: JMutex m_mutex; std::list m_list; + JSemaphore m_size; }; #endif diff --git a/src/util/thread.h b/src/util/thread.h index bb8e03317..8b3c33621 100644 --- a/src/util/thread.h +++ b/src/util/thread.h @@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "../jthread/jthread.h" #include "../jthread/jmutex.h" #include "../jthread/jmutexautolock.h" +#include "porting.h" template class MutexedVariable @@ -123,36 +124,38 @@ public: void add(Key key, Caller caller, CallerData callerdata, ResultQueue *dest) { - JMutexAutoLock lock(m_queue.getMutex()); - - /* - If the caller is already on the list, only update CallerData - */ - for(typename std::list< GetRequest >::iterator - i = m_queue.getList().begin(); - i != m_queue.getList().end(); ++i) { - GetRequest &request = *i; - - if(request.key == key) + JMutexAutoLock lock(m_queue.getMutex()); + + /* + If the caller is already on the list, only update CallerData + */ + for(typename std::list< GetRequest >::iterator + i = m_queue.getList().begin(); + i != m_queue.getList().end(); ++i) { - for(typename std::list< CallerInfo >::iterator - i = request.callers.begin(); - i != request.callers.end(); ++i) + GetRequest &request = *i; + + if(request.key == key) { - CallerInfo &ca = *i; - if(ca.caller == caller) + for(typename std::list< CallerInfo >::iterator + i = request.callers.begin(); + i != request.callers.end(); ++i) { - ca.data = callerdata; - return; + CallerInfo &ca = *i; + if(ca.caller == caller) + { + ca.data = callerdata; + return; + } } + CallerInfo ca; + ca.caller = caller; + ca.data = callerdata; + ca.dest = dest; + request.callers.push_back(ca); + return; } - CallerInfo ca; - ca.caller = caller; - ca.data = callerdata; - ca.dest = dest; - request.callers.push_back(ca); - return; } } @@ -168,12 +171,17 @@ public: ca.dest = dest; request.callers.push_back(ca); - m_queue.getList().push_back(request); + m_queue.push_back(request); + } + + GetRequest pop(unsigned int timeout_ms) + { + return m_queue.pop_front(timeout_ms); } - GetRequest pop(bool wait_if_empty=false) + GetRequest pop() { - return m_queue.pop_front(wait_if_empty); + return m_queue.pop_frontNoEx(); } void pushResult(GetRequest req, -- 2.25.1