diff --git a/direct/src/task/Task.py b/direct/src/task/Task.py index 8181638264..8645e5bc43 100644 --- a/direct/src/task/Task.py +++ b/direct/src/task/Task.py @@ -86,6 +86,8 @@ Task.DtoolClassDict['exit'] = exit pause = AsyncTaskPause Task.DtoolClassDict['pause'] = staticmethod(pause) +gather = Task.gather + def sequence(*taskList): seq = AsyncTaskSequence('sequence') for task in taskList: diff --git a/panda/src/event/asyncFuture.I b/panda/src/event/asyncFuture.I index 07191a34c9..37dca79038 100644 --- a/panda/src/event/asyncFuture.I +++ b/panda/src/event/asyncFuture.I @@ -17,7 +17,6 @@ INLINE AsyncFuture:: AsyncFuture() : _manager(nullptr), - _cvar(nullptr), _future_state(FS_pending), _result(nullptr) { } @@ -28,7 +27,7 @@ AsyncFuture() : */ INLINE bool AsyncFuture:: done() const { - return (FutureState)AtomicAdjust::get(_future_state) != FS_pending; + return (FutureState)AtomicAdjust::get(_future_state) >= FS_finished; } /** @@ -46,6 +45,7 @@ cancelled() const { */ INLINE void AsyncFuture:: set_done_event(const string &done_event) { + nassertv(!done()); _done_event = done_event; } @@ -104,6 +104,97 @@ set_result(TypedReferenceCount *result) { } INLINE void AsyncFuture:: -set_result(TypedWritableReferenceCount *result) { - set_result(result, result); +set_result(const EventParameter &result) { + set_result(result.get_ptr(), result.get_ptr()); +} + +/** + * Creates a new future that returns `done()` when all of the contained + * futures are done. + * + * Calling `cancel()` on the returned future will result in all contained + * futures that have not yet finished to be cancelled. + */ +INLINE AsyncFuture *AsyncFuture:: +gather(Futures futures) { + if (futures.empty()) { + AsyncFuture *fut = new AsyncFuture; + fut->_future_state = (AtomicAdjust::Integer)FS_finished; + return fut; + } else if (futures.size() == 1) { + return futures[0].p(); + } else { + return (AsyncFuture *)new AsyncGatheringFuture(move(futures)); + } +} + +/** + * Tries to atomically lock the future, assuming it is pending. Returns false + * if it is not in the pending state, implying it's either done or about to be + * cancelled. + */ +INLINE bool AsyncFuture:: +try_lock_pending() { + return set_future_state(FS_locked_pending); +} + +/** + * Should be called after try_lock_pending() returns true. + */ +INLINE void AsyncFuture:: +unlock(FutureState new_state) { + nassertv(new_state != FS_locked_pending); + FutureState orig_state = (FutureState)AtomicAdjust::set(_future_state, (AtomicAdjust::Integer)new_state); + nassertv(orig_state == FS_locked_pending); +} + +/** + * Atomically changes the future state from pending to another state. Returns + * true if successful, false if the future was already done. + * Note that once a future is in a "done" state (ie. cancelled or finished) it + * can never change state again. + */ +INLINE bool AsyncFuture:: +set_future_state(FutureState state) { + FutureState orig_state = (FutureState) + AtomicAdjust::compare_and_exchange( + _future_state, + (AtomicAdjust::Integer)FS_pending, + (AtomicAdjust::Integer)state); + + while (orig_state == FS_locked_pending) { + Thread::force_yield(); + orig_state = (FutureState)AtomicAdjust::compare_and_exchange( + _future_state, + (AtomicAdjust::Integer)FS_pending, + (AtomicAdjust::Integer)state); + } + + return orig_state == FS_pending; +} + +/** + * Returns the number of futures that were passed to the constructor. + */ +INLINE size_t AsyncGatheringFuture:: +get_num_futures() const { + return _futures.size(); +} + +/** + * Returns the nth future that was passed into the constructor. + */ +INLINE AsyncFuture *AsyncGatheringFuture:: +get_future(size_t i) const { + nassertr(i < _futures.size(), nullptr); + return _futures[i].p(); +} + +/** + * Returns the result of the nth future that was passed into the constructor. + */ +INLINE TypedObject *AsyncGatheringFuture:: +get_result(size_t i) const { + nassertr(i < _futures.size(), nullptr); + return _futures[i]->get_result(); } diff --git a/panda/src/event/asyncFuture.cxx b/panda/src/event/asyncFuture.cxx index bb299239d7..f1aaaba842 100644 --- a/panda/src/event/asyncFuture.cxx +++ b/panda/src/event/asyncFuture.cxx @@ -20,31 +20,33 @@ #include "throw_event.h" TypeHandle AsyncFuture::_type_handle; +TypeHandle AsyncGatheringFuture::_type_handle; /** * Destroys the future. Assumes notify_done() has already been called. */ AsyncFuture:: ~AsyncFuture() { - delete _cvar; - nassertv(_waiting_tasks.empty()); + // If this triggers, the future destroyed before it was cancelled, which is + // not valid. Unless we should simply call cancel() here? + nassertv(_waiting.empty()); } /** * Cancels the future. Returns true if it was cancelled, or false if the - * future was already done. + * future was already done. Either way, done() will return true after this + * call returns. + * * In the case of a task, this is equivalent to remove(). */ bool AsyncFuture:: cancel() { - if (!done()) { - if (_manager == nullptr) { - _manager = AsyncTaskManager::get_global_ptr(); - } - MutexHolder holder(_manager->_lock); + if (set_future_state(FS_cancelled)) { + // The compare-swap operation succeeded, so schedule the callbacks. notify_done(false); return true; } else { + // It's already done. return false; } } @@ -55,6 +57,22 @@ cancel() { void AsyncFuture:: output(ostream &out) const { out << get_type(); + FutureState state = (FutureState)AtomicAdjust::get(_future_state); + switch (state) { + case FS_pending: + case FS_locked_pending: + out << " (pending)"; + break; + case FS_finished: + out << " (finished)"; + break; + case FS_cancelled: + out << " (cancelled)"; + break; + default: + out << " (**INVALID**)"; + break; + } } /** @@ -66,25 +84,18 @@ wait() { return; } - // If we don't have a manager, use the global one. - if (_manager == nullptr) { - _manager = AsyncTaskManager::get_global_ptr(); + PStatTimer timer(AsyncTaskChain::_wait_pcollector); + if (task_cat.is_debug()) { + task_cat.debug() + << "Waiting for future " << *this << "\n"; } - MutexHolder holder(_manager->_lock); - if (!done()) { - if (_cvar == nullptr) { - _cvar = new ConditionVarFull(_manager->_lock); - } - if (task_cat.is_debug()) { - task_cat.debug() - << "Waiting for future " << *this << "\n"; - } - PStatTimer timer(AsyncTaskChain::_wait_pcollector); - while (!done()) { - _cvar->wait(); - } - } + // Continue to yield while the future isn't done. It may be more efficient + // to use a condition variable, but let's not add the extra complexity + // unless we're sure that we need it. + do { + Thread::force_yield(); + } while (!done()); } /** @@ -96,46 +107,52 @@ wait(double timeout) { return; } - // If we don't have a manager, use the global one. - if (_manager == nullptr) { - _manager = AsyncTaskManager::get_global_ptr(); + PStatTimer timer(AsyncTaskChain::_wait_pcollector); + if (task_cat.is_debug()) { + task_cat.debug() + << "Waiting up to " << timeout << " seconds for future " << *this << "\n"; } - MutexHolder holder(_manager->_lock); - if (!done()) { - if (_cvar == nullptr) { - _cvar = new ConditionVarFull(_manager->_lock); - } - if (task_cat.is_debug()) { - task_cat.debug() - << "Waiting up to " << timeout << " seconds for future " << *this << "\n"; - } - PStatTimer timer(AsyncTaskChain::_wait_pcollector); - _cvar->wait(timeout); - } + // Continue to yield while the future isn't done. It may be more efficient + // to use a condition variable, but let's not add the extra complexity + // unless we're sure that we need it. + ClockObject *clock = ClockObject::get_global_clock(); + double end = clock->get_real_time() + timeout; + do { + Thread::force_yield(); + } while (!done() && clock->get_real_time() < end); } /** - * Schedules the done callbacks. Assumes the manager's lock is held, and that - * the future is currently in the 'pending' state. + * Schedules the done callbacks. Called after the future has just entered the + * 'done' state. * @param clean_exit true if finished successfully, false if cancelled. */ void AsyncFuture:: notify_done(bool clean_exit) { - nassertv(_manager != nullptr); - nassertv(_manager->_lock.debug_is_locked()); - nassertv(_future_state == FS_pending); + nassertv(done()); - pvector::iterator it; - for (it = _waiting_tasks.begin(); it != _waiting_tasks.end(); ++it) { - AsyncTask *task = *it; - nassertd(task->_manager == _manager) continue; - task->_state = AsyncTask::S_active; - task->_chain->_active.push_back(task); - --task->_chain->_num_awaiting_tasks; - unref_delete(task); + // This will only be called by the thread that managed to set the + // _future_state away from the "pending" state, so this is thread safe. + + Futures::iterator it; + for (it = _waiting.begin(); it != _waiting.end(); ++it) { + AsyncFuture *fut = *it; + if (fut->is_task()) { + // It's a task. Make it active again. + wake_task((AsyncTask *)fut); + } else { + // It's a gathering future. Decrease the pending count on it, and if + // we're the last one, call notify_done() on it. + AsyncGatheringFuture *gather = (AsyncGatheringFuture *)fut; + if (!AtomicAdjust::dec(gather->_num_pending)) { + if (gather->set_future_state(FS_finished)) { + gather->notify_done(true); + } + } + } } - _waiting_tasks.clear(); + _waiting.clear(); // For historical reasons, we don't send the "done event" if the future was // cancelled. @@ -144,17 +161,6 @@ notify_done(bool clean_exit) { event->add_parameter(EventParameter(this)); throw_event(move(event)); } - - nassertv_always(FS_pending == - (FutureState)AtomicAdjust::compare_and_exchange( - _future_state, - (AtomicAdjust::Integer)FS_pending, - (AtomicAdjust::Integer)(clean_exit ? FS_finished : FS_cancelled))); - - // Finally, notify any threads that may be waiting. - if (_cvar != nullptr) { - _cvar->notify_all(); - } } /** @@ -168,28 +174,203 @@ notify_done(bool clean_exit) { */ void AsyncFuture:: set_result(TypedObject *ptr, ReferenceCount *ref_ptr) { - nassertv(!done()); - // If we don't have a manager, use the global one. - if (_manager == nullptr) { - _manager = AsyncTaskManager::get_global_ptr(); + // We don't strictly need to lock the future since only one thread is + // allowed to call set_result(), but we might as well. + FutureState orig_state = (FutureState)AtomicAdjust:: + compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending, + (AtomicAdjust::Integer)FS_locked_pending); + + while (orig_state == FS_locked_pending) { + Thread::force_yield(); + orig_state = (FutureState)AtomicAdjust:: + compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending, + (AtomicAdjust::Integer)FS_locked_pending); + } + + if (orig_state == FS_pending) { + _result = ptr; + _result_ref = ref_ptr; + unlock(FS_finished); + + // OK, now our thread owns the _waiting vector et al. + notify_done(true); + + } else if (orig_state == FS_cancelled) { + // This was originally illegal, but there is a chance that the future was + // cancelled while another thread was setting the result. So, we drop + // this, but we can issue a warning. + task_cat.warning() + << "Ignoring set_result() called on cancelled " << *this << "\n"; + + } else { + task_cat.error() + << "set_result() was called on finished " << *this << "\n"; } - MutexHolder holder(_manager->_lock); - _result = ptr; - _result_ref = ref_ptr; - notify_done(true); } /** * Indicates that the given task is waiting for this future to complete. When - * the future is done, it will reactivate the given task. - * Assumes the manager's lock is held. + * the future is done, it will reactivate the given task. If this is called + * while the future is already done, schedules the task immediately. + * Assumes the manager's lock is not held. + * @returns true if the future was pending, false if it was already done. + */ +bool AsyncFuture:: +add_waiting_task(AsyncTask *task) { + nassertr(task->is_runnable(), false); + + // We have to make sure we're not going to change state while we're in the + // process of adding the task. + if (try_lock_pending()) { + if (_manager == nullptr) { + _manager = task->_manager; + } + + _waiting.push_back(task); + + // Unlock the state. + unlock(); + nassertr(task->_manager == nullptr || task->_manager == _manager, true); + return true; + } else { + // It's already done. Wake the task immediately. + wake_task(task); + return false; + } +} + +/** + * Reactivates the given task. Assumes the manager lock is not held. */ void AsyncFuture:: -add_waiting_task(AsyncTask *task) { - nassertv(!done()); - nassertv(_manager != nullptr); - nassertv(_manager->_lock.debug_is_locked()); - task->ref(); - _waiting_tasks.push_back(task); - nassertv(task->_manager == _manager); +wake_task(AsyncTask *task) { + cerr << "waking task\n"; + AsyncTaskManager *manager = task->_manager; + if (manager == nullptr) { + // If it's an unscheduled task, schedule it on the same manager as the + // rest of the waiting tasks. + manager = _manager; + if (manager == nullptr) { + manager = AsyncTaskManager::get_global_ptr(); + } + } + + MutexHolder holder(manager->_lock); + switch (task->_state) { + case AsyncTask::S_servicing_removed: + nassertv(task->_manager == _manager); + // Re-adding a self-removed task; this just means clearing the removed + // flag. + task->_state = AsyncTask::S_servicing; + return; + + case AsyncTask::S_inactive: + // Schedule it immediately. + nassertv(task->_manager == nullptr); + + if (task_cat.is_debug()) { + task_cat.debug() + << "Adding " << *task << " (woken by future " << *this << ")\n"; + } + + { + manager->_lock.release(); + task->upon_birth(manager); + manager->_lock.acquire(); + nassertv(task->_manager == nullptr && + task->_state == AsyncTask::S_inactive); + + AsyncTaskChain *chain = manager->do_find_task_chain(task->_chain_name); + if (chain == nullptr) { + task_cat.warning() + << "Creating implicit AsyncTaskChain " << task->_chain_name + << " for " << manager->get_type() << " " << manager->get_name() << "\n"; + chain = manager->do_make_task_chain(task->_chain_name); + } + chain->do_add(task); + } + return; + + case AsyncTask::S_awaiting: + nassertv(task->_manager == _manager); + task->_state = AsyncTask::S_active; + task->_chain->_active.push_back(task); + --task->_chain->_num_awaiting_tasks; + return; + + default: + nassertv(false); + return; + } +} + +/** + * @see AsyncFuture::gather + */ +AsyncGatheringFuture:: +AsyncGatheringFuture(AsyncFuture::Futures futures) : + _futures(move(futures)), + _num_pending(0) { + + bool any_pending = false; + + AsyncFuture::Futures::const_iterator it; + for (it = _futures.begin(); it != _futures.end(); ++it) { + AsyncFuture *fut = *it; + // If this returns true, the future is not yet done and we need to + // register ourselves with it. This creates a circular reference, but it + // is resolved when the future is completed or cancelled. + if (fut->try_lock_pending()) { + if (_manager == nullptr) { + _manager = fut->_manager; + } + fut->_waiting.push_back((AsyncFuture *)this); + AtomicAdjust::inc(_num_pending); + fut->unlock(); + any_pending = true; + } + } + if (!any_pending) { + // Start in the done state if all the futures we were passed are done. + // Note that it is only safe to set this member in this manner if indeed + // no other future holds a reference to us. + _future_state = (AtomicAdjust::Integer)FS_finished; + } +} + +/** + * Cancels all the futures. Returns true if any futures were cancelled. + * Makes sure that all the futures finish before this one is marked done, in + * order to maintain the guarantee that calling result() is safe when done() + * returns true. + */ +bool AsyncGatheringFuture:: +cancel() { + if (!done()) { + // Temporarily increase the pending count so that the notify_done() + // callbacks won't end up causing it to be set to "finished". + AtomicAdjust::inc(_num_pending); + + bool any_cancelled = false; + AsyncFuture::Futures::const_iterator it; + for (it = _futures.begin(); it != _futures.end(); ++it) { + AsyncFuture *fut = *it; + if (fut->cancel()) { + any_cancelled = true; + } + } + + // Now change state to "cancelled" and call the notify_done() callbacks. + // Don't call notify_done() if another thread has beaten us to it. + if (set_future_state(FS_cancelled)) { + notify_done(false); + } + + // Decreasing the pending count is kind of pointless now, so we do it only + // in a debug build. + nassertr(!AtomicAdjust::dec(_num_pending), any_cancelled); + return any_cancelled; + } else { + return false; + } } diff --git a/panda/src/event/asyncFuture.h b/panda/src/event/asyncFuture.h index 456c872c04..e3a835eea3 100644 --- a/panda/src/event/asyncFuture.h +++ b/panda/src/event/asyncFuture.h @@ -17,7 +17,7 @@ #include "pandabase.h" #include "typedReferenceCount.h" #include "typedWritableReferenceCount.h" -#include "conditionVar.h" +#include "eventParameter.h" #include "atomicAdjust.h" class AsyncTaskManager; @@ -30,21 +30,29 @@ class ConditionVarFull; * as well as register callbacks for this future's completion. * * An AsyncFuture can be awaited from within a coroutine or task. It keeps - * track of a list of tasks waiting for this future so that they are - * automatically reactivated upon this future's completion. + * track of tasks waiting for this future and automatically reactivates them + * upon this future's completion. * * A task itself is also a subclass of AsyncFuture. Other subclasses are - * possible, although subclassing is not necessary for most purposes. + * not generally necessary, except to override the function of `cancel()`. * - * The `done()` method is used to check whether the future has completed and - * a result is available (whether it is cancelled or finished). + * Until the future is done, it is "owned" by the resolver thread, though it's + * still legal for other threads to query its state. When the resolver thread + * resolves this future using `set_result()`, or any thread calls `cancel()`, + * it instantly enters the "done" state, after which the result becomes a + * read-only field that all threads can access. * - * To get the result of the future in C++, use `wait()` and `get_result()`. - * In Python, the functionality of both of those calls is wrapped into the - * `result()` method, which waits for the future to complete before either - * returning the result or throwing an exception if the future was cancelled. + * When the future returns true for done(), a thread can use cancelled() to + * determine whether the future was cancelled or get_result() to access the + * result of the operation. Not all operations define a meaningful result + * value, so some will always return nullptr. + * + * In Python, the `cancelled()`, `wait()` and `get_result()` methods are + * wrapped up into a single `result()` method which waits for the future to + * complete before either returning the result or throwing an exception if the + * future was cancelled. * However, it is preferable to use the `await` keyword when running from a - * coroutine. + * coroutine, which only suspends the current task and not the entire thread. * * This API aims to mirror and be compatible with Python's Future class. */ @@ -58,7 +66,7 @@ PUBLISHED: INLINE bool done() const; INLINE bool cancelled() const; - EXTENSION(PyObject *result(double timeout = -1.0) const); + EXTENSION(PyObject *result(PyObject *timeout = Py_None) const); virtual bool cancel(); @@ -66,45 +74,63 @@ PUBLISHED: INLINE const string &get_done_event() const; MAKE_PROPERTY(done_event, get_done_event, set_done_event); + EXTENSION(PyObject *add_done_callback(PyObject *self, PyObject *fn)); + + EXTENSION(static PyObject *gather(PyObject *args)); + virtual void output(ostream &out) const; - void wait(); - void wait(double timeout); + BLOCKING void wait(); + BLOCKING void wait(double timeout); INLINE void set_result(nullptr_t); INLINE void set_result(TypedObject *result); INLINE void set_result(TypedReferenceCount *result); - INLINE void set_result(TypedWritableReferenceCount *result); + INLINE void set_result(const EventParameter &result); public: + void set_result(TypedObject *ptr, ReferenceCount *ref_ptr); + INLINE TypedObject *get_result() const; INLINE void get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const; + typedef pvector Futures; + INLINE static AsyncFuture *gather(Futures futures); + + virtual bool is_task() const {return false;} + void notify_done(bool clean_exit); + bool add_waiting_task(AsyncTask *task); private: - void set_result(TypedObject *ptr, ReferenceCount *ref_ptr); - void add_waiting_task(AsyncTask *task); + void wake_task(AsyncTask *task); protected: enum FutureState { + // Pending states FS_pending, + FS_locked_pending, + + // Done states FS_finished, FS_cancelled, }; + INLINE bool try_lock_pending(); + INLINE void unlock(FutureState new_state = FS_pending); + INLINE bool set_future_state(FutureState state); AsyncTaskManager *_manager; - ConditionVarFull *_cvar; TypedObject *_result; PT(ReferenceCount) _result_ref; AtomicAdjust::Integer _future_state; string _done_event; - // Tasks waiting for this one to complete. These are reference counted, but - // we can't store them in a PointerTo for circular dependency reasons. - pvector _waiting_tasks; + // Tasks and gathering futures waiting for this one to complete. + Futures _waiting; + friend class AsyncGatheringFuture; + friend class AsyncTaskChain; friend class PythonTask; public: @@ -130,6 +156,44 @@ INLINE ostream &operator << (ostream &out, const AsyncFuture &fut) { return out; }; +/** + * Specific future that collects the results of several futures. + */ +class EXPCL_PANDA_EVENT AsyncGatheringFuture FINAL : public AsyncFuture { +private: + AsyncGatheringFuture(AsyncFuture::Futures futures); + +public: + virtual bool cancel() override; + + INLINE size_t get_num_futures() const; + INLINE AsyncFuture *get_future(size_t i) const; + INLINE TypedObject *get_result(size_t i) const; + +private: + const Futures _futures; + AtomicAdjust::Integer _num_pending; + + friend class AsyncFuture; + +public: + static TypeHandle get_class_type() { + return _type_handle; + } + static void init_type() { + AsyncFuture::init_type(); + register_type(_type_handle, "AsyncGatheringFuture", + AsyncFuture::get_class_type()); + } + virtual TypeHandle get_type() const { + return get_class_type(); + } + virtual TypeHandle force_init_type() {init_type(); return get_class_type();} + +private: + static TypeHandle _type_handle; +}; + #include "asyncFuture.I" #endif diff --git a/panda/src/event/asyncFuture_ext.cxx b/panda/src/event/asyncFuture_ext.cxx index 49b1ab4d66..3150c09d7d 100644 --- a/panda/src/event/asyncFuture_ext.cxx +++ b/panda/src/event/asyncFuture_ext.cxx @@ -12,12 +12,16 @@ */ #include "asyncFuture_ext.h" +#include "asyncTaskSequence.h" +#include "eventParameter.h" +#include "paramValue.h" #include "pythonTask.h" #ifdef HAVE_PYTHON #ifndef CPPPARSER extern struct Dtool_PyTypedObject Dtool_AsyncFuture; +extern struct Dtool_PyTypedObject Dtool_ParamValueBase; extern struct Dtool_PyTypedObject Dtool_TypedObject; #endif @@ -32,7 +36,45 @@ static PyObject *get_done_result(const AsyncFuture *future) { // any PyObject value or raise an exception. const PythonTask *task = (const PythonTask *)future; return task->get_result(); + + } else if (future->is_of_type(AsyncTaskSequence::get_class_type())) { + // If it's an AsyncTaskSequence, get the result for each task. + const AsyncTaskSequence *task = (const AsyncTaskSequence *)future; + Py_ssize_t num_tasks = (Py_ssize_t)task->get_num_tasks(); + PyObject *results = PyTuple_New(num_tasks); + + for (Py_ssize_t i = 0; i < num_tasks; ++i) { + PyObject *result = get_done_result(task->get_task(i)); + if (result != nullptr) { + // This steals a reference. + PyTuple_SET_ITEM(results, i, result); + } else { + Py_DECREF(results); + return nullptr; + } + } + return results; + + } else if (future->is_of_type(AsyncGatheringFuture::get_class_type())) { + // If it's an AsyncGatheringFuture, get the result for each future. + const AsyncGatheringFuture *gather = (const AsyncGatheringFuture *)future; + Py_ssize_t num_futures = (Py_ssize_t)gather->get_num_futures(); + PyObject *results = PyTuple_New(num_futures); + + for (Py_ssize_t i = 0; i < num_futures; ++i) { + PyObject *result = get_done_result(gather->get_future((size_t)i)); + if (result != nullptr) { + // This steals a reference. + PyTuple_SET_ITEM(results, i, result); + } else { + Py_DECREF(results); + return nullptr; + } + } + return results; + } else { + // It's any other future. ReferenceCount *ref_ptr; TypedObject *ptr; future->get_result(ptr, ref_ptr); @@ -42,13 +84,36 @@ static PyObject *get_done_result(const AsyncFuture *future) { return Py_None; } + TypeHandle type = ptr->get_type(); + if (type.is_derived_from(ParamValueBase::get_class_type())) { + // If this is a ParamValueBase, return the 'value' property. + // EventStoreInt and Double are not exposed to Python for some reason. + if (type == EventStoreInt::get_class_type()) { + return Dtool_WrapValue(((EventStoreInt *)ptr)->get_value()); + } else if (type == EventStoreDouble::get_class_type()) { + return Dtool_WrapValue(((EventStoreDouble *)ptr)->get_value()); + } + + ParamValueBase *value = (ParamValueBase *)ptr; + PyObject *wrap = DTool_CreatePyInstanceTyped + ((void *)value, Dtool_ParamValueBase, false, false, type.get_index()); + if (wrap != nullptr) { + PyObject *value = PyObject_GetAttrString(wrap, "value"); + if (value != nullptr) { + return value; + } + PyErr_Restore(nullptr, nullptr, nullptr); + Py_DECREF(wrap); + } + } + if (ref_ptr != nullptr) { ref_ptr->ref(); } return DTool_CreatePyInstanceTyped ((void *)ptr, Dtool_TypedObject, (ref_ptr != nullptr), false, - ptr->get_type_index()); + type.get_index()); } } else { // If the future was cancelled, we should raise an exception. @@ -62,8 +127,8 @@ static PyObject *get_done_result(const AsyncFuture *future) { } // If we can't get that, we should pretend and make our own. if (exc_type == nullptr) { - exc_type = PyErr_NewExceptionWithDoc("concurrent.futures._base.CancelledError", - "The Future was cancelled.", + exc_type = PyErr_NewExceptionWithDoc((char*)"concurrent.futures._base.CancelledError", + (char*)"The Future was cancelled.", nullptr, nullptr); } } @@ -121,7 +186,7 @@ __await__(PyObject *self) { * raises TimeoutError. */ PyObject *Extension:: -result(double timeout) const { +result(PyObject *timeout) const { if (!_this->done()) { // Not yet done? Wait until it is done, or until a timeout occurs. But // first check to make sure we're not trying to deadlock the thread. @@ -136,11 +201,15 @@ result(double timeout) const { PyThreadState *_save; Py_UNBLOCK_THREADS #endif - //TODO: check why gcc and clang don't like infinity timeout. - if (cinf(timeout) || timeout < 0.0) { + if (timeout == Py_None) { _this->wait(); } else { - _this->wait(timeout); + PyObject *num = PyNumber_Float(timeout); + if (num != nullptr) { + _this->wait(PyFloat_AS_DOUBLE(num)); + } else { + return Dtool_Raise_ArgTypeError(timeout, 0, "result", "float"); + } } #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) Py_BLOCK_THREADS @@ -158,8 +227,8 @@ result(double timeout) const { } // If we can't get that, we should pretend and make our own. if (exc_type == nullptr) { - exc_type = PyErr_NewExceptionWithDoc("concurrent.futures._base.TimeoutError", - "The operation exceeded the given deadline.", + exc_type = PyErr_NewExceptionWithDoc((char*)"concurrent.futures._base.TimeoutError", + (char*)"The operation exceeded the given deadline.", nullptr, nullptr); } } @@ -172,4 +241,78 @@ result(double timeout) const { return get_done_result(_this); } +/** + * Schedules the given function to be run as soon as the future is complete. + * This is also called if the future is cancelled. + * If the future is already done, the callback is scheduled right away. + */ +PyObject *Extension:: +add_done_callback(PyObject *self, PyObject *fn) { + if (!PyCallable_Check(fn)) { + return Dtool_Raise_ArgTypeError(fn, 0, "add_done_callback", "callable"); + } + + PythonTask *task = new PythonTask(fn); + Py_DECREF(task->_args); + task->_args = PyTuple_Pack(1, self); + task->_append_task = false; + task->_ignore_return = true; + + // If this is an AsyncTask, make sure it is scheduled on the same chain. + if (_this->is_task()) { + AsyncTask *this_task = (AsyncTask *)_this; + task->set_task_chain(this_task->get_task_chain()); + } + + _this->add_waiting_task(task); + + Py_INCREF(Py_None); + return Py_None; +} + +/** + * Creates a new future that returns `done()` when all of the contained + * futures are done. + * + * Calling `cancel()` on the returned future will result in all contained + * futures that have not yet finished to be cancelled. + */ +PyObject *Extension:: +gather(PyObject *args) { + if (!PyTuple_Check(args)) { + return Dtool_Raise_TypeError("args is not a tuple"); + } + + Py_ssize_t size = Py_SIZE(args); + AsyncFuture::Futures futures; + futures.reserve(size); + + for (Py_ssize_t i = 0; i < size; ++i) { + PyObject *item = PyTuple_GET_ITEM(args, i); + if (DtoolInstance_Check(item)) { + AsyncFuture *fut = (AsyncFuture *)DtoolInstance_UPCAST(item, Dtool_AsyncFuture); + if (fut != nullptr) { + futures.push_back(fut); + continue; + } +#if PY_VERSION_HEX >= 0x03050000 + } else if (PyCoro_CheckExact(item)) { + // We allow passing in a coroutine instead of a future. This causes it + // to be scheduled as a task. + futures.push_back(new PythonTask(item)); + continue; +#endif + } + return Dtool_Raise_ArgTypeError(item, i, "gather", "coroutine, task or future"); + } + + AsyncFuture *future = AsyncFuture::gather(move(futures)); + if (future != nullptr) { + future->ref(); + return DTool_CreatePyInstanceTyped((void *)future, Dtool_AsyncFuture, true, false, future->get_type_index()); + } else { + return PyErr_NoMemory(); + } +} + #endif diff --git a/panda/src/event/asyncFuture_ext.h b/panda/src/event/asyncFuture_ext.h index 498225478d..21786ee17b 100644 --- a/panda/src/event/asyncFuture_ext.h +++ b/panda/src/event/asyncFuture_ext.h @@ -29,7 +29,11 @@ public: static PyObject *__await__(PyObject *self); static PyObject *__iter__(PyObject *self) { return __await__(self); } - PyObject *result(double timeout = -1.0) const; + PyObject *result(PyObject *timeout = Py_None) const; + + PyObject *add_done_callback(PyObject *self, PyObject *fn); + + static PyObject *gather(PyObject *args); }; #endif // HAVE_PYTHON diff --git a/panda/src/event/asyncTask.h b/panda/src/event/asyncTask.h index a941cd4b36..d514b2f5a3 100644 --- a/panda/src/event/asyncTask.h +++ b/panda/src/event/asyncTask.h @@ -104,6 +104,7 @@ protected: DoneStatus unlock_and_do_task(); virtual bool cancel() FINAL; + virtual bool is_task() const FINAL {return true;} virtual bool is_runnable(); virtual DoneStatus do_task(); diff --git a/panda/src/event/asyncTaskChain.cxx b/panda/src/event/asyncTaskChain.cxx index bda6113bd1..47299d31ef 100644 --- a/panda/src/event/asyncTaskChain.cxx +++ b/panda/src/event/asyncTaskChain.cxx @@ -778,7 +778,8 @@ cleanup_task(AsyncTask *task, bool upon_death, bool clean_exit) { _manager->remove_task_by_name(task); - if (upon_death && !task->done()) { + if (upon_death && task->set_future_state(clean_exit ? AsyncFuture::FS_finished + : AsyncFuture::FS_cancelled)) { task->notify_done(clean_exit); } diff --git a/panda/src/event/config_event.cxx b/panda/src/event/config_event.cxx index 86a8df8361..85727d2d7c 100644 --- a/panda/src/event/config_event.cxx +++ b/panda/src/event/config_event.cxx @@ -33,6 +33,7 @@ NotifyCategoryDef(task, ""); ConfigureFn(config_event) { AsyncFuture::init_type(); + AsyncGatheringFuture::init_type(); AsyncTask::init_type(); AsyncTaskChain::init_type(); AsyncTaskManager::init_type(); diff --git a/panda/src/event/pythonTask.cxx b/panda/src/event/pythonTask.cxx index bda2706407..996587885e 100644 --- a/panda/src/event/pythonTask.cxx +++ b/panda/src/event/pythonTask.cxx @@ -45,6 +45,7 @@ PythonTask(PyObject *func_or_coro, const string &name) : _exc_traceback(nullptr), _generator(nullptr), _future_done(nullptr), + _ignore_return(false), _retrieved_exception(false) { nassertv(func_or_coro != nullptr); @@ -169,9 +170,7 @@ get_args() { } this->ref(); - PyObject *self = - DTool_CreatePyInstanceTyped(this, Dtool_TypedReferenceCount, - true, false, get_type_index()); + PyObject *self = DTool_CreatePyInstance(this, Dtool_PythonTask, true, false); PyTuple_SET_ITEM(with_task, num_args, self); return with_task; @@ -588,20 +587,21 @@ do_python_task() { AsyncFuture *fut = (AsyncFuture *)DtoolInstance_UPCAST(result, Dtool_AsyncFuture); if (fut != nullptr) { // Suspend execution of this task until this other task has completed. - AsyncTaskManager *manager = fut->_manager; - if (manager == nullptr) { - manager = _manager; - fut->_manager = manager; - } - nassertr(manager == _manager, DS_interrupt); - MutexHolder holder(manager->_lock); - if (fut != (AsyncFuture *)this) { - if (!fut->done()) { + if (fut != (AsyncFuture *)this && !fut->done()) { + if (fut->is_task()) { + // This is actually a task, do we need to schedule it with the + // manager? This allows doing something like + // await Task.pause(1.0) + // directly instead of having to do: + // await taskMgr.add(Task.pause(1.0)) + AsyncTask *task = (AsyncTask *)fut; + _manager->add(task); + } + if (fut->add_waiting_task(this)) { if (task_cat.is_debug()) { task_cat.debug() << *this << " is now awaiting <" << *fut << ">.\n"; } - fut->add_waiting_task(this); } else { // The task is already done. Continue at next opportunity. if (task_cat.is_debug()) { @@ -664,7 +664,7 @@ do_python_task() { return DS_interrupt; } - if (result == Py_None) { + if (result == Py_None || _ignore_return) { Py_DECREF(result); return DS_done; } @@ -861,15 +861,10 @@ void PythonTask:: call_function(PyObject *function) { if (function != Py_None) { this->ref(); - PyObject *self = - DTool_CreatePyInstanceTyped(this, Dtool_TypedReferenceCount, - true, false, get_type_index()); - PyObject *args = Py_BuildValue("(O)", self); - Py_DECREF(self); - - PyObject *result = PyObject_CallObject(function, args); + PyObject *self = DTool_CreatePyInstance(this, Dtool_PythonTask, true, false); + PyObject *result = PyObject_CallFunctionObjArgs(function, self, nullptr); Py_XDECREF(result); - Py_DECREF(args); + Py_DECREF(self); } } diff --git a/panda/src/event/pythonTask.h b/panda/src/event/pythonTask.h index 869a0691ab..48826769de 100644 --- a/panda/src/event/pythonTask.h +++ b/panda/src/event/pythonTask.h @@ -26,7 +26,7 @@ * This class exists to allow association of a Python function or coroutine * with the AsyncTaskManager. */ -class PythonTask : public AsyncTask { +class PythonTask FINAL : public AsyncTask { PUBLISHED: PythonTask(PyObject *function = Py_None, const string &name = string()); virtual ~PythonTask(); @@ -123,6 +123,7 @@ private: PyObject *_future_done; bool _append_task; + bool _ignore_return; bool _registered_to_owner; mutable bool _retrieved_exception; diff --git a/tests/event/test_futures.py b/tests/event/test_futures.py index c353834257..0778605da4 100644 --- a/tests/event/test_futures.py +++ b/tests/event/test_futures.py @@ -159,6 +159,105 @@ def test_coro_exception(): task.result() +def test_future_gather(): + fut1 = core.AsyncFuture() + fut2 = core.AsyncFuture() + + # 0 and 1 arguments are special + assert core.AsyncFuture.gather().done() + assert core.AsyncFuture.gather(fut1) == fut1 + + # Gathering not-done futures + gather = core.AsyncFuture.gather(fut1, fut2) + assert not gather.done() + + # One future done + fut1.set_result(1) + assert not gather.done() + + # Two futures done + fut2.set_result(2) + assert gather.done() + + assert not gather.cancelled() + assert tuple(gather.result()) == (1, 2) + + +def test_future_gather_cancel_inner(): + fut1 = core.AsyncFuture() + fut2 = core.AsyncFuture() + + # Gathering not-done futures + gather = core.AsyncFuture.gather(fut1, fut2) + assert not gather.done() + + # One future cancelled + fut1.cancel() + assert not gather.done() + + # Two futures cancelled + fut2.set_result(2) + assert gather.done() + + assert not gather.cancelled() + with pytest.raises(CancelledError): + assert gather.result() + + +def test_future_gather_cancel_outer(): + fut1 = core.AsyncFuture() + fut2 = core.AsyncFuture() + + # Gathering not-done futures + gather = core.AsyncFuture.gather(fut1, fut2) + assert not gather.done() + + assert gather.cancel() + assert gather.done() + assert gather.cancelled() + + with pytest.raises(CancelledError): + assert gather.result() + + +def test_future_done_callback(): + fut = core.AsyncFuture() + + # Use the list hack since Python 2 doesn't have the "nonlocal" keyword. + called = [False] + def on_done(arg): + assert arg == fut + called[0] = True + + fut.add_done_callback(on_done) + fut.cancel() + assert fut.done() + + task_mgr = core.AsyncTaskManager.get_global_ptr() + task_mgr.poll() + assert called[0] + + +def test_future_done_callback_already_done(): + # Same as above, but with the future already done when add_done_callback + # is called. + fut = core.AsyncFuture() + fut.cancel() + assert fut.done() + + # Use the list hack since Python 2 doesn't have the "nonlocal" keyword. + called = [False] + def on_done(arg): + assert arg == fut + called[0] = True + + fut.add_done_callback(on_done) + + task_mgr = core.AsyncTaskManager.get_global_ptr() + task_mgr.poll() + assert called[0] + + def test_event_future(): queue = core.EventQueue() handler = core.EventHandler(queue)