diff --git a/panda/src/event/Sources.pp b/panda/src/event/Sources.pp index acb59bbb82..d89e1aec1b 100644 --- a/panda/src/event/Sources.pp +++ b/panda/src/event/Sources.pp @@ -9,6 +9,7 @@ #define SOURCES \ asyncTask.h asyncTask.I \ + asyncTaskChain.h asyncTaskChain.I \ asyncTaskCollection.h asyncTaskCollection.I \ asyncTaskManager.h asyncTaskManager.I \ config_event.h \ @@ -24,6 +25,7 @@ #define INCLUDED_SOURCES \ asyncTask.cxx \ + asyncTaskChain.cxx \ asyncTaskCollection.cxx \ asyncTaskManager.cxx \ buttonEvent.cxx \ @@ -37,6 +39,7 @@ #define INSTALL_HEADERS \ asyncTask.h asyncTask.I \ + asyncTaskChain.h asyncTaskChain.I \ asyncTaskCollection.h asyncTaskCollection.I \ asyncTaskManager.h asyncTaskManager.I \ buttonEvent.I buttonEvent.h \ diff --git a/panda/src/event/asyncTask.I b/panda/src/event/asyncTask.I index 79e91d99c5..7707d2620f 100644 --- a/panda/src/event/asyncTask.I +++ b/panda/src/event/asyncTask.I @@ -28,7 +28,8 @@ AsyncTask(const string &name) : _priority(0), _state(S_inactive), _servicing_thread(NULL), - _manager(NULL) + _manager(NULL), + _chain(NULL) { #ifdef HAVE_PYTHON _python_object = NULL; @@ -160,6 +161,19 @@ clear_name() { set_name(string()); } +//////////////////////////////////////////////////////////////////// +// Function: AsyncTask::get_task_chain +// Access: Published +// Description: Returns the AsyncTaskChain on which this task will +// be running. Each task chain runs tasks independently +// of the others. +//////////////////////////////////////////////////////////////////// +INLINE const string &AsyncTask:: +get_task_chain() const { + return _chain_name; +} + + //////////////////////////////////////////////////////////////////// // Function: AsyncTask::get_sort // Access: Published diff --git a/panda/src/event/asyncTask.cxx b/panda/src/event/asyncTask.cxx index 0188d78e15..822c5b707e 100644 --- a/panda/src/event/asyncTask.cxx +++ b/panda/src/event/asyncTask.cxx @@ -14,6 +14,7 @@ #include "asyncTask.h" #include "asyncTaskManager.h" +#include "config_event.h" TypeHandle AsyncTask::_type_handle; @@ -24,7 +25,7 @@ TypeHandle AsyncTask::_type_handle; //////////////////////////////////////////////////////////////////// AsyncTask:: ~AsyncTask() { - nassertv(_state == S_inactive && _manager == NULL); + nassertv(_state == S_inactive && _manager == NULL && _chain == NULL); } //////////////////////////////////////////////////////////////////// @@ -61,7 +62,7 @@ get_elapsed_time() const { //////////////////////////////////////////////////////////////////// // Function: AsyncTask::set_name -// Access: Public +// Access: Published // Description: //////////////////////////////////////////////////////////////////// void AsyncTask:: @@ -83,6 +84,52 @@ set_name(const string &name) { } } +//////////////////////////////////////////////////////////////////// +// Function: AsyncTask::set_task_chain +// Access: Published +// Description: Specifies the AsyncTaskChain on which this task will +// be running. Each task chain runs tasks independently +// of the others. +//////////////////////////////////////////////////////////////////// +void AsyncTask:: +set_task_chain(const string &chain_name) { + if (chain_name != _chain_name) { + if (_manager != (AsyncTaskManager *)NULL) { + MutexHolder holder(_manager->_lock); + if (_state == S_active) { + // Changing chains on an "active" (i.e. enqueued) task means + // removing it and re-inserting it into the queue. + PT(AsyncTask) hold_task = this; + PT(AsyncTaskManager) manager = _manager; + + AsyncTaskChain *chain_a = manager->do_find_task_chain(_chain_name); + nassertv(chain_a != (AsyncTaskChain *)NULL); + chain_a->do_remove(this); + _chain_name = chain_name; + + AsyncTaskChain *chain_b = manager->do_find_task_chain(_chain_name); + if (chain_b == (AsyncTaskChain *)NULL) { + event_cat.warning() + << "Creating implicit AsyncTaskChain " << _chain_name + << " for " << manager->get_type() << " " + << manager->get_name() << "\n"; + chain_b = manager->do_make_task_chain(_chain_name); + } + chain_b->do_add(this); + + } else { + // If it's sleeping, currently being serviced, or something + // else, we can just change the chain_name value directly. + _chain_name = chain_name; + } + } else { + // If it hasn't been started anywhere, we can just change the + // chain_name value. + _chain_name = chain_name; + } + } +} + //////////////////////////////////////////////////////////////////// // Function: AsyncTask::set_sort // Access: Published @@ -104,7 +151,7 @@ set_sort(int sort) { if (sort != _sort) { if (_manager != (AsyncTaskManager *)NULL) { MutexHolder holder(_manager->_lock); - if (_state == S_active && _sort >= _manager->_current_sort) { + if (_state == S_active && _sort >= _chain->_current_sort) { // Changing sort on an "active" (i.e. enqueued) task means // removing it and re-inserting it into the queue. PT(AsyncTask) hold_task = this; @@ -143,7 +190,7 @@ set_priority(int priority) { if (priority != _priority) { if (_manager != (AsyncTaskManager *)NULL) { MutexHolder holder(_manager->_lock); - if (_state == S_active && _sort >= _manager->_current_sort) { + if (_state == S_active && _sort >= _chain->_current_sort) { // Changing priority on an "active" (i.e. enqueued) task means // removing it and re-inserting it into the queue. PT(AsyncTask) hold_task = this; diff --git a/panda/src/event/asyncTask.h b/panda/src/event/asyncTask.h index 9637d876f5..b300a80494 100644 --- a/panda/src/event/asyncTask.h +++ b/panda/src/event/asyncTask.h @@ -31,6 +31,7 @@ #endif // HAVE_PYTHON class AsyncTaskManager; +class AsyncTaskChain; //////////////////////////////////////////////////////////////////// // Class : AsyncTask @@ -78,6 +79,9 @@ PUBLISHED: void set_name(const string &name); INLINE void clear_name(); + void set_task_chain(const string &chain_name); + INLINE const string &get_task_chain() const; + void set_sort(int sort); INLINE int get_sort() const; @@ -98,6 +102,7 @@ protected: virtual DoneStatus do_task(); protected: + string _chain_name; double _delay; bool _has_delay; double _wake_time; @@ -108,6 +113,7 @@ protected: State _state; Thread *_servicing_thread; AsyncTaskManager *_manager; + AsyncTaskChain *_chain; private: #ifdef HAVE_PYTHON @@ -132,6 +138,7 @@ private: static TypeHandle _type_handle; friend class AsyncTaskManager; + friend class AsyncTaskChain; }; INLINE ostream &operator << (ostream &out, const AsyncTask &task) { diff --git a/panda/src/event/asyncTaskChain.I b/panda/src/event/asyncTaskChain.I new file mode 100644 index 0000000000..fe808fd445 --- /dev/null +++ b/panda/src/event/asyncTaskChain.I @@ -0,0 +1,49 @@ +// Filename: asyncTaskChain.I +// Created by: drose (23Aug06) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::is_started +// Access: Published +// Description: Returns true if the thread(s) have been started and +// are ready to service requests, false otherwise. If +// this is false, the next call to add() or add_and_do() +// will automatically start the threads. +//////////////////////////////////////////////////////////////////// +INLINE bool AsyncTaskChain:: +is_started() const { + return (_state == S_started); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::set_tick_clock +// Access: Published +// Description: Sets the tick_clock flag. When this is true, +// get_clock()->tick() will be called automatically at +// each task epoch. This is false by default. +//////////////////////////////////////////////////////////////////// +INLINE void AsyncTaskChain:: +set_tick_clock(bool clock) { + _tick_clock = clock; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_tick_clock +// Access: Published +// Description: Returns the tick_clock flag.. See set_tick_clock(). +//////////////////////////////////////////////////////////////////// +INLINE bool AsyncTaskChain:: +get_tick_clock() const { + return _tick_clock; +} diff --git a/panda/src/event/asyncTaskChain.cxx b/panda/src/event/asyncTaskChain.cxx new file mode 100644 index 0000000000..109a6c1cbc --- /dev/null +++ b/panda/src/event/asyncTaskChain.cxx @@ -0,0 +1,929 @@ +// Filename: asyncTaskChain.cxx +// Created by: drose (23Aug06) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + +#include "asyncTaskChain.h" +#include "asyncTaskManager.h" +#include "event.h" +#include "pt_Event.h" +#include "throw_event.h" +#include "eventParameter.h" +#include "mutexHolder.h" +#include "indent.h" +#include "pStatClient.h" +#include "pStatTimer.h" +#include "clockObject.h" +#include + +TypeHandle AsyncTaskChain::_type_handle; + +PStatCollector AsyncTaskChain::_task_pcollector("Task"); +PStatCollector AsyncTaskChain::_wait_pcollector("Wait"); + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::Constructor +// Access: Published +// Description: +//////////////////////////////////////////////////////////////////// +AsyncTaskChain:: +AsyncTaskChain(AsyncTaskManager *manager, const string &name) : + Namable(name), + _manager(manager), + _tick_clock(false), + _num_threads(0), + _cvar(manager->_lock), + _num_tasks(0), + _num_busy_threads(0), + _state(S_initial), + _current_sort(INT_MAX), + _needs_cleanup(false) +{ +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::Destructor +// Access: Published, Virtual +// Description: +//////////////////////////////////////////////////////////////////// +AsyncTaskChain:: +~AsyncTaskChain() { + // We only grab the lock if _needs_cleanup is true. This way, the + // temporary AsyncTaskChain objects created (and destructed) within + // the task manager won't risk a double-lock. + if (_needs_cleanup) { + MutexHolder holder(_manager->_lock); + do_cleanup(); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::set_num_threads +// Access: Published +// Description: Changes the number of threads for this task chain. +// This may require stopping the threads if they are +// already running. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +set_num_threads(int num_threads) { + nassertv(num_threads >= 0); + + if (!Thread::is_threading_supported()) { + num_threads = 0; + } + + MutexHolder holder(_manager->_lock); + if (_num_threads != num_threads) { + do_stop_threads(); + _num_threads = num_threads; + + if (_num_tasks != 0) { + do_start_threads(); + } + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_num_threads +// Access: Published +// Description: Returns the number of threads that will be servicing +// tasks for this chain. Also see +// get_num_running_threads(). +//////////////////////////////////////////////////////////////////// +int AsyncTaskChain:: +get_num_threads() const { + MutexHolder holder(_manager->_lock); + return _num_threads; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_num_running_threads +// Access: Published +// Description: Returns the number of threads that have been created +// and are actively running. This will return 0 before +// the threads have been started; it will also return 0 +// if thread support is not available. +//////////////////////////////////////////////////////////////////// +int AsyncTaskChain:: +get_num_running_threads() const { + MutexHolder holder(_manager->_lock); + return _threads.size(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::stop_threads +// Access: Published +// Description: Stops any threads that are currently running. If any +// tasks are still pending and have not yet been picked +// up by a thread, they will not be serviced unless +// poll() or start_threads() is later called. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +stop_threads() { + if (_state == S_started || _state == S_aborting) { + // Clean up all of the threads. + MutexHolder holder(_manager->_lock); + do_stop_threads(); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::start_threads +// Access: Published +// Description: Starts any requested threads to service the tasks on +// the queue. This is normally not necessary, since +// adding a task will start the threads automatically. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +start_threads() { + if (_state == S_initial || _state == S_aborting) { + MutexHolder holder(_manager->_lock); + do_start_threads(); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::has_task +// Access: Published +// Description: Returns true if the indicated task has been added to +// this AsyncTaskChain, false otherwise. +//////////////////////////////////////////////////////////////////// +bool AsyncTaskChain:: +has_task(AsyncTask *task) const { + MutexHolder holder(_manager->_lock); + + if (task->_chain != this) { + nassertr(!do_has_task(task), false); + return false; + } + + if (task->_state == AsyncTask::S_servicing_removed) { + return false; + } + + // The task might not actually be in the active queue, since it + // might be being serviced right now. That's OK. + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::wait_for_tasks +// Access: Published +// Description: Blocks until the task list is empty. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +wait_for_tasks() { + MutexHolder holder(_manager->_lock); + do_wait_for_tasks(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_num_tasks +// Access: Published +// Description: Returns the number of tasks that are currently active +// or sleeping within the task chain. +//////////////////////////////////////////////////////////////////// +int AsyncTaskChain:: +get_num_tasks() const { + MutexHolder holder(_manager->_lock); + return _num_tasks; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_tasks +// Access: Published +// Description: Returns the set of tasks that are active or sleeping +// on the task chain, at the time of the call. +//////////////////////////////////////////////////////////////////// +AsyncTaskCollection AsyncTaskChain:: +get_tasks() const { + MutexHolder holder(_manager->_lock); + AsyncTaskCollection result = do_get_active_tasks(); + result.add_tasks_from(do_get_sleeping_tasks()); + return result; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_active_tasks +// Access: Published +// Description: Returns the set of tasks that are active (and not +// sleeping) on the task chain, at the time of the +// call. +//////////////////////////////////////////////////////////////////// +AsyncTaskCollection AsyncTaskChain:: +get_active_tasks() const { + MutexHolder holder(_manager->_lock); + return do_get_active_tasks(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::get_sleeping_tasks +// Access: Published +// Description: Returns the set of tasks that are sleeping (and not +// active) on the task chain, at the time of the +// call. +//////////////////////////////////////////////////////////////////// +AsyncTaskCollection AsyncTaskChain:: +get_sleeping_tasks() const { + MutexHolder holder(_manager->_lock); + return do_get_sleeping_tasks(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::poll +// Access: Published +// Description: Runs through all the tasks in the task list, once, if +// the task chain is running in single-threaded mode +// (no threads available). This method does nothing in +// threaded mode, so it may safely be called in either +// case. +// +// Normally, you would not call this function directly; +// instead, call AsyncTaskManager::poll(), which polls +// all of the task chains in sequence. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +poll() { + MutexHolder holder(_manager->_lock); + do_poll(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::output +// Access: Published, Virtual +// Description: +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +output(ostream &out) const { + MutexHolder holder(_manager->_lock); + + out << get_type() << " " << get_name() + << "; " << _num_tasks << " tasks"; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::write +// Access: Published, Virtual +// Description: +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +write(ostream &out, int indent_level) const { + MutexHolder holder(_manager->_lock); + do_write(out, indent_level); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_add +// Access: Protected +// Description: Adds the indicated task to the active queue. It is +// an error if the task is already added to this or any +// other active queue. +// +// This is normally called only by the AsyncTaskManager. +// Assumes the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_add(AsyncTask *task) { + nassertv(task->_chain == NULL && + task->_manager == NULL && + task->_chain_name == get_name() && + task->_state == AsyncTask::S_inactive); + nassertv(!do_has_task(task)); + + do_start_threads(); + + task->_chain = this; + task->_manager = _manager; + + double now = _manager->_clock->get_frame_time(); + task->_start_time = now; + + if (task->has_delay()) { + // This is a deferred task. Add it to the sleeping queue. + task->_wake_time = now + task->get_delay(); + task->_state = AsyncTask::S_sleeping; + _sleeping.push_back(task); + push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); + + } else { + // This is an active task. Add it to the active set. + task->_state = AsyncTask::S_active; + if (task->get_sort() > _current_sort) { + // It will run this frame. + _active.push_back(task); + push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); + } else { + // It will run next frame. + _next_active.push_back(task); + } + } + ++_num_tasks; + ++(_manager->_num_tasks); + _needs_cleanup = true; + _cvar.signal_all(); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_remove +// Access: Protected +// Description: Removes the indicated task from this chain. Returns +// true if removed, false otherwise. Assumes the lock +// is already held. +//////////////////////////////////////////////////////////////////// +bool AsyncTaskChain:: +do_remove(AsyncTask *task) { + bool removed = false; + + nassertr(task->_chain == this, false); + + switch (task->_state) { + case AsyncTask::S_servicing: + // This task is being serviced. + task->_state = AsyncTask::S_servicing_removed; + removed = true; + break; + + case AsyncTask::S_servicing_removed: + // Being serviced, though it will be removed later. + break; + + case AsyncTask::S_sleeping: + // Sleeping, easy. + { + int index = find_task_on_heap(_sleeping, task); + nassertr(index != -1, false); + _sleeping.erase(_sleeping.begin() + index); + make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); + removed = true; + cleanup_task(task, false); + } + break; + + case AsyncTask::S_active: + { + // Active, but not being serviced, easy. + int index = find_task_on_heap(_active, task); + if (index != -1) { + _active.erase(_active.begin() + index); + make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); + } else { + index = find_task_on_heap(_next_active, task); + nassertr(index != -1, false); + _next_active.erase(_next_active.begin() + index); + } + removed = true; + cleanup_task(task, false); + } + } + + return removed; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_wait_for_tasks +// Access: Protected +// Description: Blocks until the task list is empty. Assumes the +// lock is held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_wait_for_tasks() { + do_start_threads(); + + if (_threads.empty()) { + // Non-threaded case. + while (_num_tasks > 0) { + if (_state == S_shutdown || _state == S_aborting) { + return; + } + do_poll(); + } + + } else { + // Threaded case. + while (_num_tasks > 0) { + if (_state == S_shutdown || _state == S_aborting) { + return; + } + + PStatTimer timer(_wait_pcollector); + _cvar.wait(); + } + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_cleanup +// Access: Protected +// Description: Stops all threads and messily empties the task list. +// This is intended to be called on destruction only. +// Assumes the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_cleanup() { + do_stop_threads(); + + TaskHeap::const_iterator ti; + for (ti = _active.begin(); ti != _active.end(); ++ti) { + AsyncTask *task = (*ti); + cleanup_task(task, false); + } + for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) { + AsyncTask *task = (*ti); + cleanup_task(task, false); + } + for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) { + AsyncTask *task = (*ti); + cleanup_task(task, false); + } + + _needs_cleanup = false; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_has_task +// Access: Protected +// Description: Returns true if the task is on one of the task lists, +// false if it is not (false may mean that the task is +// currently being serviced). Assumes the lock is +// currently held. +//////////////////////////////////////////////////////////////////// +bool AsyncTaskChain:: +do_has_task(AsyncTask *task) const { + return (find_task_on_heap(_active, task) != -1 || + find_task_on_heap(_next_active, task) != -1 || + find_task_on_heap(_sleeping, task) != -1); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::find_task_on_heap +// Access: Protected +// Description: Returns the index number of the indicated task within +// the specified task list, or -1 if the task is not +// found in the list (this may mean that it is currently +// being serviced). Assumes that the lock is currently +// held. +//////////////////////////////////////////////////////////////////// +int AsyncTaskChain:: +find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const { + for (int i = 0; i < (int)heap.size(); ++i) { + if (heap[i] == task) { + return i; + } + } + + return -1; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::service_one_task +// Access: Protected +// Description: Pops a single task off the active queue, services it, +// and restores it to the end of the queue. This is +// called internally only within one of the task +// threads. Assumes the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) { + if (!_active.empty()) { + PT(AsyncTask) task = _active.front(); + pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); + _active.pop_back(); + + if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) { + thread->_servicing = task; + } + + nassertv(task->get_sort() == _current_sort); + nassertv(task->_state == AsyncTask::S_active); + task->_state = AsyncTask::S_servicing; + task->_servicing_thread = thread; + + // Now release the chain lock while we actually service the + // task. + _manager->_lock.release(); + AsyncTask::DoneStatus ds = task->do_task(); + + // Now we have to re-acquire the chain lock, so we can put the + // task back on the queue (and so we can return with the lock + // still held). + _manager->_lock.lock(); + + if (thread != (AsyncTaskChain::AsyncTaskChainThread *)NULL) { + thread->_servicing = NULL; + } + task->_servicing_thread = NULL; + + if (task->_chain == this) { + // TODO: check task->_chain_name to see if the task wants to + // jump chains. + + if (task->_state == AsyncTask::S_servicing_removed) { + // This task wants to kill itself. + cleanup_task(task, false); + + } else { + switch (ds) { + case AsyncTask::DS_cont: + // The task is still alive; put it on the next frame's active + // queue. + task->_state = AsyncTask::S_active; + _next_active.push_back(task); + _cvar.signal_all(); + break; + + case AsyncTask::DS_again: + // The task wants to sleep again. + { + double now = _manager->_clock->get_frame_time(); + task->_wake_time = now + task->get_delay(); + task->_state = AsyncTask::S_sleeping; + _sleeping.push_back(task); + push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); + _cvar.signal_all(); + } + break; + + case AsyncTask::DS_abort: + // The task had an exception and wants to raise a big flag. + cleanup_task(task, false); + if (_state == S_started) { + _state = S_aborting; + _cvar.signal_all(); + } + break; + + default: + // The task has finished. + cleanup_task(task, true); + } + } + } + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::cleanup_task +// Access: Protected +// Description: Called internally when a task has completed (or been +// interrupted) and is about to be removed from the +// active queue. Assumes the lock is held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +cleanup_task(AsyncTask *task, bool clean_exit) { + nassertv(task->_chain == this); + + task->_state = AsyncTask::S_inactive; + task->_chain = NULL; + task->_manager = NULL; + --_num_tasks; + --(_manager->_num_tasks); + + _manager->remove_task_by_name(task); + + if (clean_exit && !task->_done_event.empty()) { + PT_Event event = new Event(task->_done_event); + event->add_parameter(EventParameter(task)); + throw_event(event); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::finish_sort_group +// Access: Protected +// Description: Called internally when all tasks of a given sort +// value have been completed, and it is time to +// increment to the next sort value, or begin the next +// epoch. Assumes the lock is held. +// +// Returns true if there are more tasks on the queue +// after this operation, or false if the task list is +// empty and we need to wait. +//////////////////////////////////////////////////////////////////// +bool AsyncTaskChain:: +finish_sort_group() { + nassertr(_num_busy_threads == 0, true); + + if (!_active.empty()) { + // There are more tasks; just set the next sort value. + nassertr(_current_sort < _active.front()->get_sort(), true); + _current_sort = _active.front()->get_sort(); + _cvar.signal_all(); + return true; + } + + // There are no more tasks in this epoch; advance to the next epoch. + if (_tick_clock) { + _manager->_clock->tick(); + } + if (!_threads.empty()) { + PStatClient::thread_tick(get_name()); + } + + _active.swap(_next_active); + + // Check for any sleeping tasks that need to be woken. + double now = _manager->_clock->get_frame_time(); + while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) { + PT(AsyncTask) task = _sleeping.front(); + pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); + _sleeping.pop_back(); + task->_state = AsyncTask::S_active; + _active.push_back(task); + } + + make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); + nassertr(_num_tasks == _active.size() + _sleeping.size(), true); + + if (!_active.empty()) { + // Get the first task on the queue. + _current_sort = _active.front()->get_sort(); + _cvar.signal_all(); + return true; + } + + // There are no tasks to be had anywhere. Chill. + _current_sort = INT_MAX; + return false; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_stop_threads +// Access: Protected +// Description: The private implementation of stop_threads; assumes +// the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_stop_threads() { + if (_state == S_started || _state == S_aborting) { + _state = S_shutdown; + _cvar.signal_all(); + + Threads wait_threads; + wait_threads.swap(_threads); + + // We have to release the lock while we join, so the threads can + // wake up and see that we're shutting down. + _manager->_lock.release(); + Threads::iterator ti; + for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) { + (*ti)->join(); + } + _manager->_lock.lock(); + + _state = S_initial; + nassertv(_num_busy_threads == 0); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_start_threads +// Access: Protected +// Description: The private implementation of start_threads; assumes +// the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_start_threads() { + if (_state == S_aborting) { + do_stop_threads(); + } + + if (_state == S_initial) { + _state = S_started; + _num_busy_threads = 0; + if (Thread::is_threading_supported()) { + _needs_cleanup = true; + _threads.reserve(_num_threads); + for (int i = 0; i < _num_threads; ++i) { + ostringstream strm; + strm << _manager->get_name(); + if (has_name()) { + strm << "_" << get_name(); + } + strm << "_" << i; + PT(AsyncTaskChainThread) thread = new AsyncTaskChainThread(strm.str(), this); + if (thread->start(TP_low, true)) { + _threads.push_back(thread); + } + } + } + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_get_active_tasks +// Access: Protected +// Description: Returns the set of tasks that are active (and not +// sleeping) on the task chain, at the time of the +// call. Assumes the lock is held. +//////////////////////////////////////////////////////////////////// +AsyncTaskCollection AsyncTaskChain:: +do_get_active_tasks() const { + AsyncTaskCollection result; + + Threads::const_iterator thi; + for (thi = _threads.begin(); thi != _threads.end(); ++thi) { + AsyncTask *task = (*thi)->_servicing; + if (task != (AsyncTask *)NULL) { + result.add_task(task); + } + } + TaskHeap::const_iterator ti; + for (ti = _active.begin(); ti != _active.end(); ++ti) { + AsyncTask *task = (*ti); + result.add_task(task); + } + for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) { + AsyncTask *task = (*ti); + result.add_task(task); + } + + return result; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_get_sleeping_tasks +// Access: Published +// Description: Returns the set of tasks that are sleeping (and not +// active) on the task chain, at the time of the +// call. Assumes the lock is held. +//////////////////////////////////////////////////////////////////// +AsyncTaskCollection AsyncTaskChain:: +do_get_sleeping_tasks() const { + AsyncTaskCollection result; + + TaskHeap::const_iterator ti; + for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) { + AsyncTask *task = (*ti); + result.add_task(task); + } + + return result; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_poll +// Access: Protected +// Description: The private implementation of poll(), this assumes +// the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_poll() { + do_start_threads(); + + if (!_threads.empty()) { + return; + } + + while (!_active.empty() && _state != S_shutdown && _state != S_aborting) { + _current_sort = _active.front()->get_sort(); + service_one_task(NULL); + } + + if (_state != S_shutdown && _state != S_aborting) { + finish_sort_group(); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::do_write +// Access: Protected +// Description: The private implementation of write(), this assumes +// the lock is already held. +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain:: +do_write(ostream &out, int indent_level) const { + indent(out, indent_level) + << get_type() << " " << get_name() << "\n"; + + // Collect a list of all active tasks, then sort them into order for + // output. + TaskHeap tasks = _active; + tasks.insert(tasks.end(), _next_active.begin(), _next_active.end()); + + Threads::const_iterator thi; + for (thi = _threads.begin(); thi != _threads.end(); ++thi) { + AsyncTask *task = (*thi)->_servicing; + if (task != (AsyncTask *)NULL && + task->_state != AsyncTask::S_servicing_removed) { + tasks.push_back(task); + } + } + + if (!tasks.empty()) { + indent(out, indent_level + 2) + << "Active tasks:\n"; + + sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority()); + + // Since AsyncTaskSortPriority() sorts backwards (because of STL's + // push_heap semantics), we go through the task list in reverse + // order to print them forwards. + TaskHeap::reverse_iterator ti; + int current_sort = tasks.back()->get_sort() - 1; + for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) { + AsyncTask *task = (*ti); + if (task->get_sort() != current_sort) { + current_sort = task->get_sort(); + indent(out, indent_level + 2) + << "sort = " << current_sort << "\n"; + } + if (task->_state == AsyncTask::S_servicing) { + indent(out, indent_level + 3) + << "*" << *task << "\n"; + } else { + indent(out, indent_level + 4) + << *task << "\n"; + } + } + } + + if (!_sleeping.empty()) { + indent(out, indent_level + 2) + << "Sleeping tasks:\n"; + double now = _manager->_clock->get_frame_time(); + + // Instead of iterating through the _sleeping list in heap order, + // copy it and then use repeated pops to get it out in sorted + // order, for the user's satisfaction. + TaskHeap sleeping = _sleeping; + while (!sleeping.empty()) { + PT(AsyncTask) task = sleeping.front(); + pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime()); + sleeping.pop_back(); + + indent(out, indent_level + 4) + << task->get_wake_time() - now << "s: " + << *task << "\n"; + } + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::AsyncTaskChainThread::Constructor +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +AsyncTaskChain::AsyncTaskChainThread:: +AsyncTaskChainThread(const string &name, AsyncTaskChain *chain) : + Thread(name, chain->get_name()), + _chain(chain), + _servicing(NULL) +{ +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskChain::AsyncTaskChainThread::thread_main +// Access: Public, Virtual +// Description: +//////////////////////////////////////////////////////////////////// +void AsyncTaskChain::AsyncTaskChainThread:: +thread_main() { + MutexHolder holder(_chain->_manager->_lock); + while (_chain->_state != S_shutdown && _chain->_state != S_aborting) { + if (!_chain->_active.empty() && + _chain->_active.front()->get_sort() == _chain->_current_sort) { + PStatTimer timer(_task_pcollector); + _chain->_num_busy_threads++; + _chain->service_one_task(this); + _chain->_num_busy_threads--; + _chain->_cvar.signal_all(); + + } else { + // We've finished all the available tasks of the current sort + // value. We can't pick up a new task until all of the threads + // finish the tasks with the same sort value. + if (_chain->_num_busy_threads == 0) { + // We're the last thread to finish. Update _current_sort. + if (!_chain->finish_sort_group()) { + // Nothing to do. Wait for more tasks to be added. + if (_chain->_sleeping.empty()) { + PStatTimer timer(_wait_pcollector); + _chain->_cvar.wait(); + } else { + double wake_time = _chain->_sleeping.front()->get_wake_time(); + double now = _chain->_manager->_clock->get_frame_time(); + double timeout = max(wake_time - now, 0.0); + PStatTimer timer(_wait_pcollector); + _chain->_cvar.wait(timeout); + } + } + + } else { + // Wait for the other threads to finish their current task + // before we continue. + PStatTimer timer(_wait_pcollector); + _chain->_cvar.wait(); + } + } + } +} + diff --git a/panda/src/event/asyncTaskChain.h b/panda/src/event/asyncTaskChain.h new file mode 100644 index 0000000000..b6aca289e3 --- /dev/null +++ b/panda/src/event/asyncTaskChain.h @@ -0,0 +1,190 @@ +// Filename: asyncTaskChain.h +// Created by: drose (23Aug06) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + +#ifndef ASYNCTASKCHAIN_H +#define ASYNCTASKCHAIN_H + +#include "pandabase.h" + +#include "asyncTask.h" +#include "asyncTaskCollection.h" +#include "typedReferenceCount.h" +#include "thread.h" +#include "pmutex.h" +#include "conditionVarFull.h" +#include "pvector.h" +#include "pdeque.h" +#include "pStatCollector.h" +#include "clockObject.h" + +class AsyncTaskManager; + +//////////////////////////////////////////////////////////////////// +// Class : AsyncTaskChain +// Description : The AsyncTaskChain is a subset of the +// AsyncTaskManager. Each chain maintains a separate +// list of tasks, and will execute them with its own set +// of threads. Each chain may thereby operate +// independently of the other chains. +// +// The AsyncTaskChain will spawn a specified number of +// threads (possibly 0) to serve the tasks. If there +// are no threads, you must call poll() from time to +// time to serve the tasks in the main thread. Normally +// this is done by calling AsyncTaskManager::poll(). +// +// Each task will run exactly once each epoch. Beyond +// that, the tasks' sort and priority values control the +// order in which they are run: tasks are run in +// increasing order by sort value, and within the same +// sort value, they are run roughly in decreasing order +// by priority value, with some exceptions for +// parallelism. Tasks with different sort values are +// never run in parallel together, but tasks with +// different priority values might be (if there is more +// than one thread). +//////////////////////////////////////////////////////////////////// +class EXPCL_PANDA_EVENT AsyncTaskChain : public TypedReferenceCount, public Namable { +public: + AsyncTaskChain(AsyncTaskManager *manager, const string &name); + ~AsyncTaskChain(); + +PUBLISHED: + INLINE void set_tick_clock(bool tick_clock); + INLINE bool get_tick_clock() const; + + BLOCKING void set_num_threads(int num_threads); + int get_num_threads() const; + int get_num_running_threads() const; + + BLOCKING void stop_threads(); + void start_threads(); + INLINE bool is_started() const; + + bool has_task(AsyncTask *task) const; + + BLOCKING void wait_for_tasks(); + + int get_num_tasks() const; + AsyncTaskCollection get_tasks() const; + AsyncTaskCollection get_active_tasks() const; + AsyncTaskCollection get_sleeping_tasks() const; + + void poll(); + + virtual void output(ostream &out) const; + virtual void write(ostream &out, int indent_level = 0) const; + +protected: + class AsyncTaskChainThread; + typedef pvector< PT(AsyncTask) > TaskHeap; + + void do_add(AsyncTask *task); + bool do_remove(AsyncTask *task); + void do_wait_for_tasks(); + void do_cleanup(); + + bool do_has_task(AsyncTask *task) const; + int find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const; + + void service_one_task(AsyncTaskChainThread *thread); + void cleanup_task(AsyncTask *task, bool clean_exit); + bool finish_sort_group(); + void do_stop_threads(); + void do_start_threads(); + AsyncTaskCollection do_get_active_tasks() const; + AsyncTaskCollection do_get_sleeping_tasks() const; + void do_poll(); + void do_write(ostream &out, int indent_level) const; + +protected: + class AsyncTaskChainThread : public Thread { + public: + AsyncTaskChainThread(const string &name, AsyncTaskChain *chain); + virtual void thread_main(); + + AsyncTaskChain *_chain; + AsyncTask *_servicing; + }; + + class AsyncTaskSortWakeTime { + public: + bool operator () (AsyncTask *a, AsyncTask *b) const { + return a->get_wake_time() > b->get_wake_time(); + } + }; + + class AsyncTaskSortPriority { + public: + bool operator () (AsyncTask *a, AsyncTask *b) const { + if (a->get_sort() != b->get_sort()) { + return a->get_sort() > b->get_sort(); + } + return a->get_priority() < b->get_priority(); + } + }; + + typedef pvector< PT(AsyncTaskChainThread) > Threads; + + AsyncTaskManager *_manager; + + ConditionVarFull _cvar; // signaled when _active, _next_active, _sleeping, _state, or _current_sort changes, or a task finishes. + + enum State { + S_initial, // no threads yet + S_started, // threads have been started + S_aborting, // task returned DS_abort, shutdown requested from sub-thread. + S_shutdown // waiting for thread shutdown, requested from main thread + }; + + bool _tick_clock; + int _num_threads; + Threads _threads; + int _num_busy_threads; + int _num_tasks; + TaskHeap _active; + TaskHeap _next_active; + TaskHeap _sleeping; + State _state; + int _current_sort; + bool _needs_cleanup; + + static PStatCollector _task_pcollector; + static PStatCollector _wait_pcollector; + +public: + static TypeHandle get_class_type() { + return _type_handle; + } + static void init_type() { + TypedReferenceCount::init_type(); + register_type(_type_handle, "AsyncTaskChain", + TypedReferenceCount::get_class_type()); + } + virtual TypeHandle get_type() const { + return get_class_type(); + } + virtual TypeHandle force_init_type() {init_type(); return get_class_type();} + +private: + static TypeHandle _type_handle; + + friend class AsyncTaskChainThread; + friend class AsyncTask; + friend class AsyncTaskManager; +}; + +#include "asyncTaskChain.I" + +#endif diff --git a/panda/src/event/asyncTaskManager.I b/panda/src/event/asyncTaskManager.I index ab3f1a4c51..1329d8faad 100644 --- a/panda/src/event/asyncTaskManager.I +++ b/panda/src/event/asyncTaskManager.I @@ -13,19 +13,6 @@ //////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::is_started -// Access: Published -// Description: Returns true if the thread(s) have been started and -// are ready to service requests, false otherwise. If -// this is false, the next call to add() or add_and_do() -// will automatically start the threads. -//////////////////////////////////////////////////////////////////// -INLINE bool AsyncTaskManager:: -is_started() const { - return (_state == S_started); -} - //////////////////////////////////////////////////////////////////// // Function: AsyncTaskManager::set_clock // Access: Published @@ -53,55 +40,6 @@ get_clock() { return _clock; } -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::set_tick_clock -// Access: Published -// Description: Sets the tick_clock flag. When this is true, -// get_clock()->tick() will be called automatically at -// each task epoch. This is false by default. -//////////////////////////////////////////////////////////////////// -INLINE void AsyncTaskManager:: -set_tick_clock(bool clock) { - _tick_clock = clock; -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::get_tick_clock -// Access: Published -// Description: Returns the tick_clock flag.. See set_tick_clock(). -//////////////////////////////////////////////////////////////////// -INLINE bool AsyncTaskManager:: -get_tick_clock() const { - return _tick_clock; -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::get_num_threads -// Access: Published -// Description: Returns the number of threads that will be servicing -// tasks for this manager. Also see -// get_num_running_threads(). -//////////////////////////////////////////////////////////////////// -INLINE int AsyncTaskManager:: -get_num_threads() const { - MutexHolder holder(_lock); - return _num_threads; -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::get_num_running_threads -// Access: Published -// Description: Returns the number of threads that have been created -// and are actively running. This will return 0 before -// the threads have been started; it will also return 0 -// if thread support is not available. -//////////////////////////////////////////////////////////////////// -INLINE int AsyncTaskManager:: -get_num_running_threads() const { - MutexHolder holder(_lock); - return _threads.size(); -} - //////////////////////////////////////////////////////////////////// // Function: AsyncTaskManager::get_num_tasks // Access: Published diff --git a/panda/src/event/asyncTaskManager.cxx b/panda/src/event/asyncTaskManager.cxx index f123830a51..4a3d8b9a65 100644 --- a/panda/src/event/asyncTaskManager.cxx +++ b/panda/src/event/asyncTaskManager.cxx @@ -22,31 +22,21 @@ #include "pStatClient.h" #include "pStatTimer.h" #include "clockObject.h" +#include "config_event.h" #include TypeHandle AsyncTaskManager::_type_handle; -PStatCollector AsyncTaskManager::_task_pcollector("Task"); -PStatCollector AsyncTaskManager::_wait_pcollector("Wait"); - //////////////////////////////////////////////////////////////////// // Function: AsyncTaskManager::Constructor // Access: Published // Description: //////////////////////////////////////////////////////////////////// AsyncTaskManager:: -AsyncTaskManager(const string &name, int num_threads) : +AsyncTaskManager(const string &name) : Namable(name), - _num_threads(0), - _cvar(_lock), - _num_tasks(0), - _num_busy_threads(0), - _state(S_initial), - _current_sort(INT_MAX), - _clock(ClockObject::get_global_clock()), - _tick_clock(false) + _clock(ClockObject::get_global_clock()) { - set_num_threads(num_threads); } //////////////////////////////////////////////////////////////////// @@ -56,75 +46,100 @@ AsyncTaskManager(const string &name, int num_threads) : //////////////////////////////////////////////////////////////////// AsyncTaskManager:: ~AsyncTaskManager() { - stop_threads(); - - TaskHeap::const_iterator ti; - for (ti = _active.begin(); ti != _active.end(); ++ti) { - AsyncTask *task = (*ti); - cleanup_task(task, false); - } - for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) { - AsyncTask *task = (*ti); - cleanup_task(task, false); - } - for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) { - AsyncTask *task = (*ti); - cleanup_task(task, false); - } -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::set_num_threads -// Access: Published -// Description: Changes the number of threads for this task manager. -// This may require stopping the threads if they are -// already running. -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -set_num_threads(int num_threads) { - nassertv(num_threads >= 0); - - if (!Thread::is_threading_supported()) { - num_threads = 0; - } - MutexHolder holder(_lock); - if (_num_threads != num_threads) { - do_stop_threads(); - _num_threads = num_threads; + + TaskChains::iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + chain->do_cleanup(); } } //////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::stop_threads +// Function: AsyncTaskManager::get_num_task_chains // Access: Published -// Description: Stops any threads that are currently running. If any -// tasks are still pending and have not yet been picked -// up by a thread, they will not be serviced unless -// poll() or start_threads() is later called. +// Description: Returns the number of different task chains. //////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -stop_threads() { - if (_state == S_started || _state == S_aborting) { - // Clean up all of the threads. - MutexHolder holder(_lock); - do_stop_threads(); - } +int AsyncTaskManager:: +get_num_task_chains() const { + MutexHolder holder(_lock); + return _task_chains.size(); } //////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::start_threads +// Function: AsyncTaskManager::get_task_chain // Access: Published -// Description: Starts any requested threads to service the tasks on -// the queue. This is normally not necessary, since -// adding a task will start the threads automatically. +// Description: Returns the nth task chain. //////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -start_threads() { - if (_state == S_initial || _state == S_aborting) { - MutexHolder holder(_lock); - do_start_threads(); +AsyncTaskChain *AsyncTaskManager:: +get_task_chain(int n) const { + MutexHolder holder(_lock); + nassertr(n >= 0 && n < (int)_task_chains.size(), NULL); + return _task_chains[n]; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::make_task_chain +// Access: Published +// Description: Creates a new AsyncTaskChain of the indicated name +// and stores it within the AsyncTaskManager. If a task +// chain with this name already exists, returns it +// instead. +//////////////////////////////////////////////////////////////////// +AsyncTaskChain *AsyncTaskManager:: +make_task_chain(const string &name) { + MutexHolder holder(_lock); + return do_make_task_chain(name); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::find_task_chain +// Access: Protected +// Description: Searches a new AsyncTaskChain of the indicated name +// and returns it if it exists, or NULL otherwise. +//////////////////////////////////////////////////////////////////// +AsyncTaskChain *AsyncTaskManager:: +find_task_chain(const string &name) { + MutexHolder holder(_lock); + return do_find_task_chain(name); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::remove_task_chain +// Access: Protected +// Description: Removes the AsyncTaskChain of the indicated name. +// If the chain still has tasks, this will block until +// all tasks are finished. +// +// Returns true if successful, or false if the chain did +// not exist. +//////////////////////////////////////////////////////////////////// +bool AsyncTaskManager:: +remove_task_chain(const string &name) { + MutexHolder holder(_lock); + + PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name); + TaskChains::iterator tci = _task_chains.find(chain); + if (tci == _task_chains.end()) { + // No chain. + return false; } + + chain = (*tci); + + while (chain->_num_tasks != 0) { + // Still has tasks. + event_cat.info() + << "Waiting for tasks on chain " << name << " to finish.\n"; + chain->do_wait_for_tasks(); + } + + // Safe to remove. + chain->do_cleanup(); + _task_chains.erase(tci); + return true; } //////////////////////////////////////////////////////////////////// @@ -151,35 +166,16 @@ add(AsyncTask *task) { task->_state == AsyncTask::S_inactive); nassertv(!do_has_task(task)); - do_start_threads(); - - task->_manager = this; add_task_by_name(task); - double now = _clock->get_frame_time(); - task->_start_time = now; - - if (task->has_delay()) { - // This is a deferred task. Add it to the sleeping queue. - task->_wake_time = now + task->get_delay(); - task->_state = AsyncTask::S_sleeping; - _sleeping.push_back(task); - push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); - - } else { - // This is an active task. Add it to the active set. - task->_state = AsyncTask::S_active; - if (task->get_sort() > _current_sort) { - // It will run this frame. - _active.push_back(task); - push_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); - } else { - // It will run next frame. - _next_active.push_back(task); - } + AsyncTaskChain *chain = do_find_task_chain(task->_chain_name); + if (chain == (AsyncTaskChain *)NULL) { + event_cat.warning() + << "Creating implicit AsyncTaskChain " << task->_chain_name + << " for " << get_type() << " " << get_name() << "\n"; + chain = do_make_task_chain(task->_chain_name); } - ++_num_tasks; - _cvar.signal_all(); + chain->do_add(task); } //////////////////////////////////////////////////////////////////// @@ -311,43 +307,9 @@ remove(const AsyncTaskCollection &tasks) { // Not a member of this manager, or already removed. nassertr(!do_has_task(task), num_removed); } else { - switch (task->_state) { - case AsyncTask::S_servicing: - // This task is being serviced. - task->_state = AsyncTask::S_servicing_removed; - break; - - case AsyncTask::S_servicing_removed: - // Being serviced, though it will be removed later. - break; - - case AsyncTask::S_sleeping: - // Sleeping, easy. - { - int index = find_task_on_heap(_sleeping, task); - nassertr(index != -1, num_removed); - _sleeping.erase(_sleeping.begin() + index); - make_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); - ++num_removed; - cleanup_task(task, false); - } - break; - - case AsyncTask::S_active: - { - // Active, but not being serviced, easy. - int index = find_task_on_heap(_active, task); - if (index != -1) { - _active.erase(_active.begin() + index); - make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); - } else { - index = find_task_on_heap(_next_active, task); - nassertr(index != -1, num_removed); - _next_active.erase(_next_active.begin() + index); - } - ++num_removed; - cleanup_task(task, false); - } + nassertr(task->_chain->_manager == this, num_removed); + if (task->_chain->do_remove(task)) { + ++num_removed; } } } @@ -364,27 +326,56 @@ void AsyncTaskManager:: wait_for_tasks() { MutexHolder holder(_lock); - do_start_threads(); - - if (_threads.empty()) { - // Non-threaded case. - while (_num_tasks > 0) { - if (_state == S_shutdown || _state == S_aborting) { - return; - } - do_poll(); + // Wait for each of our task chains to finish. + while (_num_tasks > 0) { + TaskChains::iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + chain->do_wait_for_tasks(); } + } +} - } else { - // Threaded case. - while (_num_tasks > 0) { - if (_state == S_shutdown || _state == S_aborting) { - return; - } - - PStatTimer timer(_wait_pcollector); - _cvar.wait(); - } +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::stop_threads +// Access: Published +// Description: Stops any threads that are currently running. If any +// tasks are still pending and have not yet been picked +// up by a thread, they will not be serviced unless +// poll() or start_threads() is later called. +//////////////////////////////////////////////////////////////////// +void AsyncTaskManager:: +stop_threads() { + MutexHolder holder(_lock); + + TaskChains::iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + chain->do_stop_threads(); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::start_threads +// Access: Published +// Description: Starts any requested threads to service the tasks on +// the queue. This is normally not necessary, since +// adding a task will start the threads automatically. +//////////////////////////////////////////////////////////////////// +void AsyncTaskManager:: +start_threads() { + MutexHolder holder(_lock); + + TaskChains::iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + chain->do_start_threads(); } } @@ -395,9 +386,19 @@ wait_for_tasks() { // on the task manager, at the time of the call. //////////////////////////////////////////////////////////////////// AsyncTaskCollection AsyncTaskManager:: -get_tasks() { - AsyncTaskCollection result = get_active_tasks(); - result.add_tasks_from(get_sleeping_tasks()); +get_tasks() const { + MutexHolder holder(_lock); + + AsyncTaskCollection result; + TaskChains::const_iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + result.add_tasks_from(chain->do_get_active_tasks()); + result.add_tasks_from(chain->do_get_sleeping_tasks()); + } + return result; } @@ -409,24 +410,16 @@ get_tasks() { // call. //////////////////////////////////////////////////////////////////// AsyncTaskCollection AsyncTaskManager:: -get_active_tasks() { - AsyncTaskCollection result; +get_active_tasks() const { + MutexHolder holder(_lock); - Threads::const_iterator thi; - for (thi = _threads.begin(); thi != _threads.end(); ++thi) { - AsyncTask *task = (*thi)->_servicing; - if (task != (AsyncTask *)NULL) { - result.add_task(task); - } - } - TaskHeap::const_iterator ti; - for (ti = _active.begin(); ti != _active.end(); ++ti) { - AsyncTask *task = (*ti); - result.add_task(task); - } - for (ti = _next_active.begin(); ti != _next_active.end(); ++ti) { - AsyncTask *task = (*ti); - result.add_task(task); + AsyncTaskCollection result; + TaskChains::const_iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + result.add_tasks_from(chain->do_get_active_tasks()); } return result; @@ -440,13 +433,16 @@ get_active_tasks() { // call. //////////////////////////////////////////////////////////////////// AsyncTaskCollection AsyncTaskManager:: -get_sleeping_tasks() { - AsyncTaskCollection result; +get_sleeping_tasks() const { + MutexHolder holder(_lock); - TaskHeap::const_iterator ti; - for (ti = _sleeping.begin(); ti != _sleeping.end(); ++ti) { - AsyncTask *task = (*ti); - result.add_task(task); + AsyncTaskCollection result; + TaskChains::const_iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + result.add_tasks_from(chain->do_get_sleeping_tasks()); } return result; @@ -464,13 +460,14 @@ get_sleeping_tasks() { void AsyncTaskManager:: poll() { MutexHolder holder(_lock); - do_start_threads(); - if (!_threads.empty()) { - return; + TaskChains::iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + chain->do_poll(); } - - do_poll(); } //////////////////////////////////////////////////////////////////// @@ -497,66 +494,12 @@ write(ostream &out, int indent_level) const { indent(out, indent_level) << get_type() << " " << get_name() << "\n"; - // Collect a list of all active tasks, then sort them into order for - // output. - TaskHeap tasks = _active; - tasks.insert(tasks.end(), _next_active.begin(), _next_active.end()); - - Threads::const_iterator thi; - for (thi = _threads.begin(); thi != _threads.end(); ++thi) { - AsyncTask *task = (*thi)->_servicing; - if (task != (AsyncTask *)NULL && - task->_state != AsyncTask::S_servicing_removed) { - tasks.push_back(task); - } - } - - if (!tasks.empty()) { - indent(out, indent_level + 2) - << "Active tasks:\n"; - - sort(tasks.begin(), tasks.end(), AsyncTaskSortPriority()); - - // Since AsyncTaskSortPriority() sorts backwards (because of STL's - // push_heap semantics), we go through the task list in reverse - // order to print them forwards. - TaskHeap::reverse_iterator ti; - int current_sort = tasks.back()->get_sort() - 1; - for (ti = tasks.rbegin(); ti != tasks.rend(); ++ti) { - AsyncTask *task = (*ti); - if (task->get_sort() != current_sort) { - current_sort = task->get_sort(); - indent(out, indent_level + 2) - << "sort = " << current_sort << "\n"; - } - if (task->_state == AsyncTask::S_servicing) { - indent(out, indent_level + 3) - << "*" << *task << "\n"; - } else { - indent(out, indent_level + 4) - << *task << "\n"; - } - } - } - - if (!_sleeping.empty()) { - indent(out, indent_level + 2) - << "Sleeping tasks:\n"; - double now = _clock->get_frame_time(); - - // Instead of iterating through the _sleeping list in heap order, - // copy it and then use repeated pops to get it out in sorted - // order, for the user's satisfaction. - TaskHeap sleeping = _sleeping; - while (!sleeping.empty()) { - PT(AsyncTask) task = sleeping.front(); - pop_heap(sleeping.begin(), sleeping.end(), AsyncTaskSortWakeTime()); - sleeping.pop_back(); - - indent(out, indent_level + 4) - << task->get_wake_time() - now << "s: " - << *task << "\n"; - } + TaskChains::const_iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + chain->do_write(out, indent_level + 2); } } @@ -570,29 +513,55 @@ write(ostream &out, int indent_level) const { //////////////////////////////////////////////////////////////////// bool AsyncTaskManager:: do_has_task(AsyncTask *task) const { - return (find_task_on_heap(_active, task) != -1 || - find_task_on_heap(_next_active, task) != -1 || - find_task_on_heap(_sleeping, task) != -1); -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::find_task_on_heap -// Access: Protected -// Description: Returns the index number of the indicated task within -// the specified task list, or -1 if the task is not -// found in the list (this may mean that it is currently -// being serviced). Assumes that the lock is currently -// held. -//////////////////////////////////////////////////////////////////// -int AsyncTaskManager:: -find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const { - for (int i = 0; i < (int)heap.size(); ++i) { - if (heap[i] == task) { - return i; + TaskChains::const_iterator tci; + for (tci = _task_chains.begin(); + tci != _task_chains.end(); + ++tci) { + AsyncTaskChain *chain = (*tci); + if (chain->do_has_task(task)) { + return true; } } - return -1; + return false; +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::do_make_task_chain +// Access: Protected +// Description: Creates a new AsyncTaskChain of the indicated name +// and stores it within the AsyncTaskManager. If a task +// chain with this name already exists, returns it +// instead. +// +// Assumes the lock is held. +//////////////////////////////////////////////////////////////////// +AsyncTaskChain *AsyncTaskManager:: +do_make_task_chain(const string &name) { + PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name); + + TaskChains::const_iterator tci = _task_chains.insert(chain).first; + return (*tci); +} + +//////////////////////////////////////////////////////////////////// +// Function: AsyncTaskManager::do_find_task_chain +// Access: Protected +// Description: Searches a new AsyncTaskChain of the indicated name +// and returns it if it exists, or NULL otherwise. +// +// Assumes the lock is held. +//////////////////////////////////////////////////////////////////// +AsyncTaskChain *AsyncTaskManager:: +do_find_task_chain(const string &name) { + PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name); + + TaskChains::const_iterator tci = _task_chains.find(chain); + if (tci != _task_chains.end()) { + return (*tci); + } + + return NULL; } //////////////////////////////////////////////////////////////////// @@ -624,307 +593,3 @@ remove_task_by_name(AsyncTask *task) { nassertv(false); } } - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::service_one_task -// Access: Protected -// Description: Pops a single task off the active queue, services it, -// and restores it to the end of the queue. This is -// called internally only within one of the task -// threads. Assumes the lock is already held. -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -service_one_task(AsyncTaskManager::AsyncTaskManagerThread *thread) { - if (!_active.empty()) { - PT(AsyncTask) task = _active.front(); - pop_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); - _active.pop_back(); - - if (thread != (AsyncTaskManager::AsyncTaskManagerThread *)NULL) { - thread->_servicing = task; - } - - nassertv(task->get_sort() == _current_sort); - nassertv(task->_state == AsyncTask::S_active); - task->_state = AsyncTask::S_servicing; - task->_servicing_thread = thread; - - // Now release the manager lock while we actually service the - // task. - _lock.release(); - AsyncTask::DoneStatus ds = task->do_task(); - - // Now we have to re-acquire the manager lock, so we can put the - // task back on the queue (and so we can return with the lock - // still held). - _lock.lock(); - - if (thread != (AsyncTaskManager::AsyncTaskManagerThread *)NULL) { - thread->_servicing = NULL; - } - task->_servicing_thread = NULL; - - if (task->_manager == this) { - if (task->_state == AsyncTask::S_servicing_removed) { - // This task wants to kill itself. - cleanup_task(task, false); - - } else { - switch (ds) { - case AsyncTask::DS_cont: - // The task is still alive; put it on the next frame's active - // queue. - task->_state = AsyncTask::S_active; - _next_active.push_back(task); - _cvar.signal_all(); - break; - - case AsyncTask::DS_again: - // The task wants to sleep again. - { - double now = _clock->get_frame_time(); - task->_wake_time = now + task->get_delay(); - task->_state = AsyncTask::S_sleeping; - _sleeping.push_back(task); - push_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); - _cvar.signal_all(); - } - break; - - case AsyncTask::DS_abort: - // The task had an exception and wants to raise a big flag. - cleanup_task(task, false); - if (_state == S_started) { - _state = S_aborting; - _cvar.signal_all(); - } - break; - - default: - // The task has finished. - cleanup_task(task, true); - } - } - } - } -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::cleanup_task -// Access: Protected -// Description: Called internally when a task has completed (or been -// interrupted) and is about to be removed from the -// active queue. Assumes the lock is held. -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -cleanup_task(AsyncTask *task, bool clean_exit) { - nassertv(task->_manager == this); - - task->_state = AsyncTask::S_inactive; - task->_manager = NULL; - --_num_tasks; - - remove_task_by_name(task); - - if (clean_exit && !task->_done_event.empty()) { - PT_Event event = new Event(task->_done_event); - event->add_parameter(EventParameter(task)); - throw_event(event); - } -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::finish_sort_group -// Access: Protected -// Description: Called internally when all tasks of a given sort -// value have been completed, and it is time to -// increment to the next sort value, or begin the next -// epoch. Assumes the lock is held. -// -// Returns true if there are more tasks on the queue -// after this operation, or false if the task list is -// empty and we need to wait. -//////////////////////////////////////////////////////////////////// -bool AsyncTaskManager:: -finish_sort_group() { - nassertr(_num_busy_threads == 0, true); - - if (!_active.empty()) { - // There are more tasks; just set the next sort value. - nassertr(_current_sort < _active.front()->get_sort(), true); - _current_sort = _active.front()->get_sort(); - _cvar.signal_all(); - return true; - } - - // There are no more tasks in this epoch; advance to the next epoch. - if (_tick_clock) { - _clock->tick(); - } - if (!_threads.empty()) { - PStatClient::thread_tick(get_name()); - } - - _active.swap(_next_active); - - // Check for any sleeping tasks that need to be woken. - double now = _clock->get_frame_time(); - while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) { - PT(AsyncTask) task = _sleeping.front(); - pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime()); - _sleeping.pop_back(); - task->_state = AsyncTask::S_active; - _active.push_back(task); - } - - make_heap(_active.begin(), _active.end(), AsyncTaskSortPriority()); - nassertr(_num_tasks == _active.size() + _sleeping.size(), true); - - if (!_active.empty()) { - // Get the first task on the queue. - _current_sort = _active.front()->get_sort(); - _cvar.signal_all(); - return true; - } - - // There are no tasks to be had anywhere. Chill. - _current_sort = INT_MAX; - return false; -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::do_stop_threads -// Access: Protected -// Description: The private implementation of stop_threads; assumes -// the lock is already held. -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -do_stop_threads() { - if (_state == S_started || _state == S_aborting) { - _state = S_shutdown; - _cvar.signal_all(); - - Threads wait_threads; - wait_threads.swap(_threads); - - // We have to release the lock while we join, so the threads can - // wake up and see that we're shutting down. - _lock.release(); - Threads::iterator ti; - for (ti = wait_threads.begin(); ti != wait_threads.end(); ++ti) { - (*ti)->join(); - } - _lock.lock(); - - _state = S_initial; - nassertv(_num_busy_threads == 0); - } -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::do_start_threads -// Access: Protected -// Description: The private implementation of start_threads; assumes -// the lock is already held. -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -do_start_threads() { - if (_state == S_aborting) { - do_stop_threads(); - } - - if (_state == S_initial) { - _state = S_started; - _num_busy_threads = 0; - if (Thread::is_threading_supported()) { - _threads.reserve(_num_threads); - for (int i = 0; i < _num_threads; ++i) { - ostringstream strm; - strm << get_name() << "_" << i; - PT(AsyncTaskManagerThread) thread = new AsyncTaskManagerThread(strm.str(), this); - if (thread->start(TP_low, true)) { - _threads.push_back(thread); - } - } - } - } -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::do_poll -// Access: Protected -// Description: The private implementation of poll(), this assumes -// the lock is already held. -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager:: -do_poll() { - while (!_active.empty() && _state != S_shutdown && _state != S_aborting) { - _current_sort = _active.front()->get_sort(); - service_one_task(NULL); - } - - if (_state != S_shutdown && _state != S_aborting) { - finish_sort_group(); - } -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::AsyncTaskManagerThread::Constructor -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -AsyncTaskManager::AsyncTaskManagerThread:: -AsyncTaskManagerThread(const string &name, AsyncTaskManager *manager) : - Thread(name, manager->get_name()), - _manager(manager), - _servicing(NULL) -{ -} - -//////////////////////////////////////////////////////////////////// -// Function: AsyncTaskManager::AsyncTaskManagerThread::thread_main -// Access: Public, Virtual -// Description: -//////////////////////////////////////////////////////////////////// -void AsyncTaskManager::AsyncTaskManagerThread:: -thread_main() { - MutexHolder holder(_manager->_lock); - while (_manager->_state != S_shutdown && _manager->_state != S_aborting) { - if (!_manager->_active.empty() && - _manager->_active.front()->get_sort() == _manager->_current_sort) { - PStatTimer timer(_task_pcollector); - _manager->_num_busy_threads++; - _manager->service_one_task(this); - _manager->_num_busy_threads--; - _manager->_cvar.signal_all(); - - } else { - // We've finished all the available tasks of the current sort - // value. We can't pick up a new task until all of the threads - // finish the tasks with the same sort value. - if (_manager->_num_busy_threads == 0) { - // We're the last thread to finish. Update _current_sort. - if (!_manager->finish_sort_group()) { - // Nothing to do. Wait for more tasks to be added. - if (_manager->_sleeping.empty()) { - PStatTimer timer(_wait_pcollector); - _manager->_cvar.wait(); - } else { - double wake_time = _manager->_sleeping.front()->get_wake_time(); - double now = _manager->_clock->get_frame_time(); - double timeout = max(wake_time - now, 0.0); - PStatTimer timer(_wait_pcollector); - _manager->_cvar.wait(timeout); - } - } - - } else { - // Wait for the other threads to finish their current task - // before we continue. - PStatTimer timer(_wait_pcollector); - _manager->_cvar.wait(); - } - } - } -} - diff --git a/panda/src/event/asyncTaskManager.h b/panda/src/event/asyncTaskManager.h index 2602245059..7f87542ff9 100644 --- a/panda/src/event/asyncTaskManager.h +++ b/panda/src/event/asyncTaskManager.h @@ -19,6 +19,7 @@ #include "asyncTask.h" #include "asyncTaskCollection.h" +#include "asyncTaskChain.h" #include "typedReferenceCount.h" #include "thread.h" #include "pmutex.h" @@ -27,6 +28,8 @@ #include "pdeque.h" #include "pStatCollector.h" #include "clockObject.h" +#include "ordered_vector.h" +#include "indirectCompareNames.h" //////////////////////////////////////////////////////////////////// // Class : AsyncTaskManager @@ -52,22 +55,17 @@ //////////////////////////////////////////////////////////////////// class EXPCL_PANDA_EVENT AsyncTaskManager : public TypedReferenceCount, public Namable { PUBLISHED: - AsyncTaskManager(const string &name, int num_threads = 0); + AsyncTaskManager(const string &name); BLOCKING virtual ~AsyncTaskManager(); INLINE void set_clock(ClockObject *clock); INLINE ClockObject *get_clock(); - INLINE void set_tick_clock(bool tick_clock); - INLINE bool get_tick_clock() const; - - BLOCKING void set_num_threads(int num_threads); - INLINE int get_num_threads() const; - INLINE int get_num_running_threads() const; - - BLOCKING void stop_threads(); - void start_threads(); - INLINE bool is_started() const; + int get_num_task_chains() const; + AsyncTaskChain *get_task_chain(int n) const; + AsyncTaskChain *make_task_chain(const string &name); + AsyncTaskChain *find_task_chain(const string &name); + BLOCKING bool remove_task_chain(const string &name); void add(AsyncTask *task); bool has_task(AsyncTask *task) const; @@ -80,12 +78,14 @@ PUBLISHED: int remove(const AsyncTaskCollection &tasks); BLOCKING void wait_for_tasks(); + BLOCKING void stop_threads(); + void start_threads(); INLINE int get_num_tasks() const; - AsyncTaskCollection get_tasks(); - AsyncTaskCollection get_active_tasks(); - AsyncTaskCollection get_sleeping_tasks(); + AsyncTaskCollection get_tasks() const; + AsyncTaskCollection get_active_tasks() const; + AsyncTaskCollection get_sleeping_tasks() const; void poll(); @@ -93,49 +93,15 @@ PUBLISHED: virtual void write(ostream &out, int indent_level = 0) const; protected: - class AsyncTaskManagerThread; - typedef pvector< PT(AsyncTask) > TaskHeap; - - bool do_has_task(AsyncTask *task) const; - int find_task_on_heap(const TaskHeap &heap, AsyncTask *task) const; + AsyncTaskChain *do_make_task_chain(const string &name); + AsyncTaskChain *do_find_task_chain(const string &name); INLINE void add_task_by_name(AsyncTask *task); void remove_task_by_name(AsyncTask *task); - void service_one_task(AsyncTaskManagerThread *thread); - void cleanup_task(AsyncTask *task, bool clean_exit); - bool finish_sort_group(); - void do_stop_threads(); - void do_start_threads(); - void do_poll(); + bool do_has_task(AsyncTask *task) const; protected: - class AsyncTaskManagerThread : public Thread { - public: - AsyncTaskManagerThread(const string &name, AsyncTaskManager *manager); - virtual void thread_main(); - - AsyncTaskManager *_manager; - AsyncTask *_servicing; - }; - - class AsyncTaskSortWakeTime { - public: - bool operator () (AsyncTask *a, AsyncTask *b) const { - return a->get_wake_time() > b->get_wake_time(); - } - }; - - class AsyncTaskSortPriority { - public: - bool operator () (AsyncTask *a, AsyncTask *b) const { - if (a->get_sort() != b->get_sort()) { - return a->get_sort() > b->get_sort(); - } - return a->get_priority() < b->get_priority(); - } - }; - class AsyncTaskSortName { public: bool operator () (AsyncTask *a, AsyncTask *b) const { @@ -143,35 +109,18 @@ protected: } }; - typedef pvector< PT(AsyncTaskManagerThread) > Threads; typedef pmultiset TasksByName; - int _num_threads; + // Protects all the following members. This same lock is also used + // to protect all of our AsyncTaskChain members. + Mutex _lock; - Mutex _lock; // Protects all the following members. - ConditionVarFull _cvar; // signaled when _active, _next_active, _sleeping, _state, or _current_sort changes, or a task finishes. + typedef ov_set > TaskChains; + TaskChains _task_chains; - enum State { - S_initial, // no threads yet - S_started, // threads have been started - S_aborting, // task returned DS_abort, shutdown requested from sub-thread. - S_shutdown // waiting for thread shutdown, requested from main thread - }; - - Threads _threads; int _num_tasks; - int _num_busy_threads; - TaskHeap _active; - TaskHeap _next_active; - TaskHeap _sleeping; TasksByName _tasks_by_name; - State _state; - int _current_sort; PT(ClockObject) _clock; - bool _tick_clock; - - static PStatCollector _task_pcollector; - static PStatCollector _wait_pcollector; public: static TypeHandle get_class_type() { @@ -190,7 +139,8 @@ public: private: static TypeHandle _type_handle; - friend class AsyncTaskManagerThread; + friend class AsyncTaskChain; + friend class AsyncTaskChain::AsyncTaskChainThread; friend class AsyncTask; }; diff --git a/panda/src/event/config_event.cxx b/panda/src/event/config_event.cxx index 719db5f0f8..5c54ec6678 100644 --- a/panda/src/event/config_event.cxx +++ b/panda/src/event/config_event.cxx @@ -14,6 +14,7 @@ #include "config_event.h" #include "asyncTask.h" +#include "asyncTaskChain.h" #include "asyncTaskManager.h" #include "buttonEventList.h" #include "event.h" @@ -29,6 +30,7 @@ NotifyCategoryDef(event, ""); ConfigureFn(config_event) { AsyncTask::init_type(); + AsyncTaskChain::init_type(); AsyncTaskManager::init_type(); ButtonEventList::init_type(); PointerEventList::init_type(); diff --git a/panda/src/event/event_composite1.cxx b/panda/src/event/event_composite1.cxx index ec47a7e4b1..ca66ff6c8b 100644 --- a/panda/src/event/event_composite1.cxx +++ b/panda/src/event/event_composite1.cxx @@ -1,4 +1,5 @@ #include "asyncTask.cxx" +#include "asyncTaskChain.cxx" #include "asyncTaskCollection.cxx" #include "asyncTaskManager.cxx" #include "buttonEvent.cxx" diff --git a/panda/src/event/test_task.cxx b/panda/src/event/test_task.cxx index e32692166b..da39308da9 100644 --- a/panda/src/event/test_task.cxx +++ b/panda/src/event/test_task.cxx @@ -49,8 +49,10 @@ static const int num_threads = 10; int main(int argc, char *argv[]) { - PT(AsyncTaskManager) task_mgr = new AsyncTaskManager("task_mgr", num_threads); - task_mgr->set_tick_clock(true); + PT(AsyncTaskManager) task_mgr = new AsyncTaskManager("task_mgr"); + PT(AsyncTaskChain) chain = task_mgr->make_task_chain(""); + chain->set_tick_clock(true); + chain->set_num_threads(num_threads); PerlinNoise2 length_noise(grid_size, grid_size); PerlinNoise2 delay_noise(grid_size, grid_size);