From 197032bd707e624dae87bd0036545bf4aa6381ca Mon Sep 17 00:00:00 2001 From: David Rose Date: Fri, 29 Jun 2007 21:44:55 +0000 Subject: [PATCH] add stop_threads() etc --- panda/src/event/asyncTaskManager.I | 26 +++++-- panda/src/event/asyncTaskManager.cxx | 103 ++++++++++++++++++--------- panda/src/event/asyncTaskManager.h | 5 ++ 3 files changed, 94 insertions(+), 40 deletions(-) diff --git a/panda/src/event/asyncTaskManager.I b/panda/src/event/asyncTaskManager.I index af194c9874..5b4fa8525d 100644 --- a/panda/src/event/asyncTaskManager.I +++ b/panda/src/event/asyncTaskManager.I @@ -17,20 +17,32 @@ //////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::is_started +// Access: Published +// Description: Returns true if the thread(s) have been started and +// are ready to service requests, false otherwise. If +// this is false, the next call to add() or add_and_do() +// will automatically start the threads. +//////////////////////////////////////////////////////////////////// +INLINE bool AsyncTaskManager:: +is_started() const { + return (_state == S_started); +} + //////////////////////////////////////////////////////////////////// // Function: AsyncTaskManager::get_num_threads // Access: Published // Description: Returns the number of threads that have been created -// to service the tasks within this task manager. +// to service the tasks within this task manager. This +// will return 0 before the threads have been started; +// it will also return 0 if thread support is not +// available. //////////////////////////////////////////////////////////////////// INLINE int AsyncTaskManager:: get_num_threads() const { -#ifdef HAVE_THREADS - return _num_threads; -#else - // Without threading support, this is always 0 threads. - return 0; -#endif + MutexHolder holder(_lock); + return _threads.size(); } //////////////////////////////////////////////////////////////////// diff --git a/panda/src/event/asyncTaskManager.cxx b/panda/src/event/asyncTaskManager.cxx index 6eaea9f034..aaeeae2859 100644 --- a/panda/src/event/asyncTaskManager.cxx +++ b/panda/src/event/asyncTaskManager.cxx @@ -53,19 +53,58 @@ AsyncTaskManager(const string &name, int num_threads) : //////////////////////////////////////////////////////////////////// AsyncTaskManager:: ~AsyncTaskManager() { + stop_threads(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::stop_threads +// Access: Published +// Description: Stops any threads that are currently running. If any +// tasks are still pending and have not yet been picked +// up by a thread, they will not be serviced unless +// poll() or start_threads() is later called. +//////////////////////////////////////////////////////////////////// +void AsyncTaskManager:: +stop_threads() { if (_state == S_started) { // Clean up all of the threads. MutexHolder holder(_lock); - _state = S_shutdown; - _cvar.signal_all(); + if (_state == S_started) { + _state = S_shutdown; + _cvar.signal_all(); - Threads::iterator ti; - for (ti = _threads.begin(); ti != _threads.end(); ++ti) { - (*ti)->join(); + Threads wait_threads; + wait_threads.swap(_threads); + + // We have to release the lock while we join, so the threads can + // wake up and see that we're shutting down. + _lock.release(); + Threads::iterator ti; + for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) { + (*ti)->join(); + } + _lock.lock(); + + _state = S_initial; } } } +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::start_threads +// Access: Published +// Description: Starts any requested threads to service the tasks on +// the queue. This is normally not necessary, since +// adding a task will start the threads automatically. +//////////////////////////////////////////////////////////////////// +void AsyncTaskManager:: +start_threads() { + if (_state == S_initial) { + MutexHolder holder(_lock); + do_start_threads(); + } +} + //////////////////////////////////////////////////////////////////// // Function: AsyncTaskManager::add // Access: Published @@ -81,19 +120,7 @@ add(AsyncTask *task) { task->_state == AsyncTask::S_inactive); nassertv(find_task(task) == -1); - // Attempt to start the threads, if we haven't already. - if (_state == S_initial) { - _state = S_started; - if (Thread::is_threading_supported()) { - _threads.reserve(_num_threads); - for (int i = 0; i < _num_threads; ++i) { - PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this); - if (thread->start(TP_low, true)) { - _threads.push_back(thread); - } - } - } - } + do_start_threads(); task->_manager = this; task->_state = AsyncTask::S_active; @@ -118,7 +145,7 @@ add(AsyncTask *task) { // execute the task before returning in the non-threaded // case. In the threaded case, this method behaves // exactly the same as add(). - +// // The return value is true if the task has been added // and is still pending, false if it has completed. //////////////////////////////////////////////////////////////////// @@ -130,19 +157,7 @@ add_and_do(AsyncTask *task) { task->_state == AsyncTask::S_inactive, false); nassertr(find_task(task) == -1, false); - // Attempt to start the threads, if we haven't already. - if (_state == S_initial) { - _state = S_started; - if (Thread::is_threading_supported()) { - _threads.reserve(_num_threads); - for (int i = 0; i < _num_threads; ++i) { - PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this); - if (thread->start(TP_low, true)) { - _threads.push_back(thread); - } - } - } - } + do_start_threads(); task->_manager = this; task->_state = AsyncTask::S_active; @@ -232,7 +247,7 @@ has_task(AsyncTask *task) const { void AsyncTaskManager:: poll() { MutexHolder holder(_lock); - if (_threads.empty()) { + if (!_threads.empty()) { return; } @@ -389,6 +404,28 @@ task_done(AsyncTask *task) { } } +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::do_start_threads +// Access: Protected +// Description: The private implementation of start_threads; assumes +// the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskManager:: +do_start_threads() { + if (_state == S_initial) { + _state = S_started; + if (Thread::is_threading_supported()) { + _threads.reserve(_num_threads); + for (int i = 0; i < _num_threads; ++i) { + PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(this); + if (thread->start(TP_low, true)) { + _threads.push_back(thread); + } + } + } + } +} + //////////////////////////////////////////////////////////////////// // Function: AsyncTaskManager::AsyncTaskManagerThread::Constructor // Access: Public diff --git a/panda/src/event/asyncTaskManager.h b/panda/src/event/asyncTaskManager.h index 93c1682be1..896fa5bfca 100644 --- a/panda/src/event/asyncTaskManager.h +++ b/panda/src/event/asyncTaskManager.h @@ -53,6 +53,9 @@ PUBLISHED: virtual ~AsyncTaskManager(); INLINE int get_num_threads() const; + BLOCKING void stop_threads(); + void start_threads(); + INLINE bool is_started() const; void add(AsyncTask *task); bool add_and_do(AsyncTask *task); @@ -72,7 +75,9 @@ protected: int find_task(AsyncTask *task) const; void service_one_task(AsyncTaskManagerThread *thread); void task_done(AsyncTask *task); + void do_start_threads(); +protected: class AsyncTaskManagerThread : public Thread { public: AsyncTaskManagerThread(AsyncTaskManager *manager);