event: Improve interface for adding tasks from C++

The order of arguments is now the same as in Python (with the callable first), sort and priority arguments have been added, and new tasks can be added directly to a chain
This commit is contained in:
rdb 2022-12-26 19:10:01 +01:00
parent 2aa2a35a9f
commit 0747f46055
8 changed files with 101 additions and 31 deletions

View File

@ -21,6 +21,46 @@ is_started() const {
return (_state == S_started); return (_state == S_started);
} }
#ifndef CPPPARSER
/**
* Adds a new task to the task chain which calls the indicated callable.
* This method is defined as a more convenient alternative to subclassing
* AsyncTask.
*
* This given callable allowed to be any object defining a call operator that
* accepts an AsyncTask pointer and returns a DoneStatus.
*
* Returns the newly created AsyncTask object.
*
* @since 1.11.0
*/
template<class Callable>
INLINE AsyncTask *AsyncTaskChain::
add(Callable callable, const std::string &name, int sort, int priority) {
class InlineTask final : public AsyncTask {
public:
InlineTask(Callable callable, const std::string &name, int sort, int priority) :
AsyncTask(name),
_callable(std::move(callable)) {
_sort = sort;
_priority = priority;
}
ALLOC_DELETED_CHAIN(InlineTask);
private:
virtual DoneStatus do_task() override final {
return _callable(this);
}
Callable _callable;
};
AsyncTask *task = new InlineTask(std::move(callable), name, sort, priority);
add(task);
return task;
}
#endif
/** /**
* Returns the time at which the next sleeping thread will awaken, or -1 if * Returns the time at which the next sleeping thread will awaken, or -1 if
* there are no sleeping threads. Assumes the lock is already held. * there are no sleeping threads. Assumes the lock is already held.

View File

@ -37,14 +37,15 @@ PStatCollector AsyncTaskChain::_wait_pcollector("Wait");
* *
*/ */
AsyncTaskChain:: AsyncTaskChain::
AsyncTaskChain(AsyncTaskManager *manager, const string &name) : AsyncTaskChain(AsyncTaskManager *manager, const string &name, int num_threads,
ThreadPriority thread_priority) :
Namable(name), Namable(name),
_manager(manager), _manager(manager),
_cvar(manager->_lock), _cvar(manager->_lock),
_tick_clock(false), _tick_clock(false),
_timeslice_priority(false), _timeslice_priority(false),
_num_threads(0), _num_threads(num_threads),
_thread_priority(TP_normal), _thread_priority(thread_priority),
_frame_budget(-1.0), _frame_budget(-1.0),
_frame_sync(false), _frame_sync(false),
_num_busy_threads(0), _num_busy_threads(0),
@ -288,6 +289,27 @@ start_threads() {
} }
} }
/**
* Adds the indicated task to the active queue. The task must be inactive, and
* may not have been added to any queue (including the current one).
*/
void AsyncTaskChain::
add(AsyncTask *task) {
nassertv(task->_manager == nullptr && task->_state == AsyncTask::S_inactive);
nassertv(task->is_runnable());
task->_chain_name = get_name();
task->upon_birth(_manager);
if (task_cat.is_debug()) {
task_cat.debug()
<< "Adding " << *task << "\n";
}
MutexHolder holder(_manager->_lock);
do_add(task);
}
/** /**
* Returns true if the indicated task has been added to this AsyncTaskChain, * Returns true if the indicated task has been added to this AsyncTaskChain,
* false otherwise. * false otherwise.

View File

@ -49,7 +49,8 @@ class AsyncTaskManager;
*/ */
class EXPCL_PANDA_EVENT AsyncTaskChain : public TypedReferenceCount, public Namable { class EXPCL_PANDA_EVENT AsyncTaskChain : public TypedReferenceCount, public Namable {
public: public:
AsyncTaskChain(AsyncTaskManager *manager, const std::string &name); AsyncTaskChain(AsyncTaskManager *manager, const std::string &name,
int num_threads=0, ThreadPriority thread_priority=TP_normal);
~AsyncTaskChain(); ~AsyncTaskChain();
PUBLISHED: PUBLISHED:
@ -76,6 +77,12 @@ PUBLISHED:
void start_threads(); void start_threads();
INLINE bool is_started() const; INLINE bool is_started() const;
void add(AsyncTask *task);
#ifndef CPPPARSER
template<class Callable>
INLINE AsyncTask *add(Callable callable, const std::string &name,
int sort = 0, int priority = 0);
#endif
bool has_task(AsyncTask *task) const; bool has_task(AsyncTask *task) const;
BLOCKING void wait_for_tasks(); BLOCKING void wait_for_tasks();

View File

@ -47,23 +47,10 @@ get_clock() {
*/ */
template<class Callable> template<class Callable>
INLINE AsyncTask *AsyncTaskManager:: INLINE AsyncTask *AsyncTaskManager::
add(const std::string &name, Callable callable) { add(Callable callable, const std::string &name, int sort, int priority) {
class InlineTask final : public AsyncTask { AsyncTaskChain *chain = make_task_chain("default");
public: nassertr(chain != nullptr, nullptr);
InlineTask(Callable callable) : _callable(std::move(callable)) {} return chain->add(std::move(callable), name, sort, priority);
ALLOC_DELETED_CHAIN(InlineTask);
private:
virtual DoneStatus do_task() override final {
return _callable(this);
}
Callable _callable;
};
AsyncTask *task = new InlineTask(std::move(callable));
add(task);
return task;
} }
#endif #endif

View File

@ -131,6 +131,17 @@ make_task_chain(const string &name) {
return do_make_task_chain(name); return do_make_task_chain(name);
} }
/**
* Creates a new threaded AsyncTaskChain of the indicated name and stores it
* within the AsyncTaskManager. If a task chain with this name already exists,
* returns it instead.
*/
AsyncTaskChain *AsyncTaskManager::
make_task_chain(const string &name, int num_threads, ThreadPriority thread_priority) {
MutexHolder holder(_lock);
return do_make_task_chain(name, num_threads, thread_priority);
}
/** /**
* Searches a new AsyncTaskChain of the indicated name and returns it if it * Searches a new AsyncTaskChain of the indicated name and returns it if it
* exists, or NULL otherwise. * exists, or NULL otherwise.
@ -562,8 +573,8 @@ write(std::ostream &out, int indent_level) const {
* Assumes the lock is held. * Assumes the lock is held.
*/ */
AsyncTaskChain *AsyncTaskManager:: AsyncTaskChain *AsyncTaskManager::
do_make_task_chain(const string &name) { do_make_task_chain(const string &name, int num_threads, ThreadPriority thread_priority) {
PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name); PT(AsyncTaskChain) chain = new AsyncTaskChain(this, name, num_threads, thread_priority);
TaskChains::const_iterator tci = _task_chains.insert(chain).first; TaskChains::const_iterator tci = _task_chains.insert(chain).first;
return (*tci); return (*tci);

View File

@ -60,13 +60,16 @@ PUBLISHED:
AsyncTaskChain *get_task_chain(int n) const; AsyncTaskChain *get_task_chain(int n) const;
MAKE_SEQ(get_task_chains, get_num_task_chains, get_task_chain); MAKE_SEQ(get_task_chains, get_num_task_chains, get_task_chain);
AsyncTaskChain *make_task_chain(const std::string &name); AsyncTaskChain *make_task_chain(const std::string &name);
AsyncTaskChain *make_task_chain(const std::string &name, int num_threads,
ThreadPriority thread_priority);
AsyncTaskChain *find_task_chain(const std::string &name); AsyncTaskChain *find_task_chain(const std::string &name);
BLOCKING bool remove_task_chain(const std::string &name); BLOCKING bool remove_task_chain(const std::string &name);
void add(AsyncTask *task); void add(AsyncTask *task);
#ifndef CPPPARSER #ifndef CPPPARSER
template<class Callable> template<class Callable>
INLINE AsyncTask *add(const std::string &name, Callable callable); INLINE AsyncTask *add(Callable callable, const std::string &name,
int sort = 0, int priority = 0);
#endif #endif
bool has_task(AsyncTask *task) const; bool has_task(AsyncTask *task) const;
@ -100,7 +103,8 @@ PUBLISHED:
INLINE static AsyncTaskManager *get_global_ptr(); INLINE static AsyncTaskManager *get_global_ptr();
protected: protected:
AsyncTaskChain *do_make_task_chain(const std::string &name); AsyncTaskChain *do_make_task_chain(const std::string &name, int num_threads=0,
ThreadPriority thread_priority=TP_normal);
AsyncTaskChain *do_find_task_chain(const std::string &name); AsyncTaskChain *do_find_task_chain(const std::string &name);
INLINE void add_task_by_name(AsyncTask *task); INLINE void add_task_by_name(AsyncTask *task);

View File

@ -1437,10 +1437,10 @@ create_anim_controls() {
setup_shuttle_button(":", 3, st_forward_button); setup_shuttle_button(":", 3, st_forward_button);
AsyncTaskManager &task_mgr = _panda_framework->get_task_mgr(); AsyncTaskManager &task_mgr = _panda_framework->get_task_mgr();
_update_anim_controls_task = task_mgr.add("controls", [this](AsyncTask *task) { _update_anim_controls_task = task_mgr.add([this](AsyncTask *task) {
update_anim_controls(); update_anim_controls();
return AsyncTask::DS_cont; return AsyncTask::DS_cont;
}); }, "controls");
} }
/** /**

View File

@ -1064,7 +1064,7 @@ async_ensure_ram_image(bool allow_compression, int priority) {
double delay = async_load_delay; double delay = async_load_delay;
// This texture has not yet been queued to be reloaded. Queue it up now. // This texture has not yet been queued to be reloaded. Queue it up now.
task = task_mgr->add(task_name, [=](AsyncTask *task) { task = chain->add([=](AsyncTask *task) {
if (delay != 0.0) { if (delay != 0.0) {
Thread::sleep(delay); Thread::sleep(delay);
} }
@ -1080,9 +1080,8 @@ async_ensure_ram_image(bool allow_compression, int priority) {
do_get_uncompressed_ram_image(cdata); do_get_uncompressed_ram_image(cdata);
} }
return AsyncTask::DS_done; return AsyncTask::DS_done;
}); }, task_name, 0, priority);
task->set_priority(priority);
task->set_task_chain("texture_reload");
cdataw->_reload_task = task; cdataw->_reload_task = task;
return (AsyncFuture *)task; return (AsyncFuture *)task;
} }