#pragma once #include #include #include #include "rethrow.h" namespace ThreadUtils { // Serialize access to some resource to a single thread // Execute blocking/nonabortable methods in with proper abortability (detach on abort and move on) class cmdThread { public: typedef std::function func_t; typedef pfc::waitQueue queue_t; typedef std::function funcAbortable_t; cmdThread() { auto q = std::make_shared(); m_queue = q; auto x = std::make_shared(); m_atExit = x; pfc::splitThread( [q, x] { for ( ;; ) { func_t f; if (!q->get(f)) break; try { f(); } catch(...) {} } // No guard for atExit access, as nobody is supposed to be still able to call host object methods by the time we get here for( auto i = x->begin(); i != x->end(); ++ i ) { auto f = *i; try { f(); } catch(...) {} } } ); } void atExit( func_t f ) { m_atExit->push_back(f); } ~cmdThread() { m_queue->set_eof(); } void runSynchronously( func_t f, abort_callback & abort ) { auto evt = m_eventPool.make(); evt->set_state(false); auto rethrow = std::make_shared(); auto worker2 = [f, rethrow, evt] { rethrow->exec(f); evt->set_state( true ); }; add ( worker2 ); abort.waitForEvent( * evt, -1 ); m_eventPool.put( evt ); rethrow->rethrow(); } void runSynchronously2( funcAbortable_t f, abort_callback & abort ) { auto subAbort = m_abortPool.make(); subAbort->reset(); auto worker = [subAbort, f] { f(*subAbort); }; try { runSynchronously( worker, abort ); } catch(...) { subAbort->set(); throw; } m_abortPool.put( subAbort ); } void add( func_t f ) { m_queue->put( f ); } private: pfc::objPool m_eventPool; pfc::objPool m_abortPool; std::shared_ptr m_queue; typedef std::list atExit_t; std::shared_ptr m_atExit; }; }