#pragma once #include #include #include #include #include "rethrow.h" #include namespace ThreadUtils { // Serialize access to some resource to a single thread // Execute blocking/nonabortable methods in with proper abortability (detach on abort and move on) class cmdThread { public: typedef std::function func_t; typedef pfc::waitQueue queue_t; typedef std::function funcAbortable_t; protected: std::function makeWorker() { auto q = m_queue; auto x = m_atExit; return [q, x] { for ( ;; ) { func_t f; if (!q->get(f)) break; try { f(); } catch(...) {} } // No guard for atExit access, as nobody is supposed to be still able to call host object methods by the time we get here for( auto i = x->begin(); i != x->end(); ++ i ) { auto f = *i; try { f(); } catch(...) {} } }; }; std::function makeWorker2( std::function updater, double interval) { auto q = m_queue; auto x = m_atExit; return [=] { pfc::lores_timer t; t.start(); for ( ;; ) { { bool bWorkReady = false; double left = interval - t.query(); if ( left > 0 ) { if (q->wait_read( left )) bWorkReady = true; } if (!bWorkReady) { updater(); t.start(); continue; } } func_t f; if (!q->get(f)) break; try { f(); } catch(...) {} } // No guard for atExit access, as nobody is supposed to be still able to call host object methods by the time we get here for( auto i = x->begin(); i != x->end(); ++ i ) { auto f = *i; try { f(); } catch(...) {} } }; }; // For derived classes: create new instance without starting thread, supply thread using by yourself class noCreate {}; cmdThread( noCreate ) {} public: cmdThread() { pfc::splitThread( makeWorker() ); } void atExit( func_t f ) { m_atExit->push_back(f); } ~cmdThread() { m_queue->set_eof(); } void runSynchronously( func_t f ) { runSynchronously_(f, nullptr); } void runSynchronously_( func_t f, abort_callback * abortOrNull ) { auto evt = m_eventPool.make(); evt->set_state(false); auto rethrow = std::make_shared(); auto worker2 = [f, rethrow, evt] { rethrow->exec(f); evt->set_state( true ); }; add ( worker2 ); if ( abortOrNull != nullptr ) { abortOrNull->waitForEvent( * evt, -1 ); } else { evt->wait_for(-1); } m_eventPool.put( evt ); rethrow->rethrow(); } void runSynchronously( func_t f, abort_callback & abort ) { runSynchronously_(f, &abort); } void runSynchronously2( funcAbortable_t f, abort_callback & abort ) { auto subAbort = m_abortPool.make(); subAbort->reset(); auto worker = [subAbort, f] { f(*subAbort); }; try { runSynchronously( worker, abort ); } catch(...) { subAbort->set(); throw; } m_abortPool.put( subAbort ); } void add( func_t f ) { m_queue->put( f ); } private: pfc::objPool m_eventPool; pfc::objPool m_abortPool; std::shared_ptr m_queue = std::make_shared(); typedef std::list atExit_t; std::shared_ptr m_atExit = std::make_shared< atExit_t >(); }; }