From cb8563acac8c738f9213214bf35f32ee6e14ba45 Mon Sep 17 00:00:00 2001 From: rdb Date: Wed, 23 Feb 2022 21:25:43 +0100 Subject: [PATCH] event: Update AsyncFuture to use new atomics implementation With explicit barriers, and the non-timeout version of wait() is now significantly more efficient by using the new futexes if available --- panda/src/audio/audioLoadRequest.I | 5 +- panda/src/event/asyncFuture.I | 45 +++++--- panda/src/event/asyncFuture.cxx | 142 ++++++++++++++---------- panda/src/event/asyncFuture.h | 9 +- panda/src/gobj/animateVerticesRequest.I | 2 +- panda/src/gobj/textureReloadRequest.I | 2 +- panda/src/pgraph/modelFlattenRequest.I | 2 +- panda/src/pgraph/modelLoadRequest.I | 2 +- panda/src/pgraph/modelSaveRequest.I | 2 +- 9 files changed, 125 insertions(+), 86 deletions(-) diff --git a/panda/src/audio/audioLoadRequest.I b/panda/src/audio/audioLoadRequest.I index 019501aa77..f5d9b8f5f8 100644 --- a/panda/src/audio/audioLoadRequest.I +++ b/panda/src/audio/audioLoadRequest.I @@ -59,7 +59,7 @@ get_positional() const { */ INLINE bool AudioLoadRequest:: is_ready() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; + return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished; } /** @@ -70,6 +70,5 @@ is_ready() const { */ INLINE AudioSound *AudioLoadRequest:: get_sound() const { - nassertr_always(done(), nullptr); - return (AudioSound *)_result; + return (AudioSound *)get_result(); } diff --git a/panda/src/event/asyncFuture.I b/panda/src/event/asyncFuture.I index c38b04d880..201e103c3e 100644 --- a/panda/src/event/asyncFuture.I +++ b/panda/src/event/asyncFuture.I @@ -27,7 +27,9 @@ AsyncFuture() : */ INLINE bool AsyncFuture:: done() const { - return (FutureState)AtomicAdjust::get(_future_state) >= FS_finished; + // Not an acquire barrier because the caller may not even care about the + // result. Instead, a fence is put in get_result(). + return _future_state.load(std::memory_order_relaxed) >= FS_finished; } /** @@ -35,7 +37,7 @@ done() const { */ INLINE bool AsyncFuture:: cancelled() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_cancelled; + return _future_state.load(std::memory_order_relaxed) == FS_cancelled; } /** @@ -65,7 +67,7 @@ INLINE TypedObject *AsyncFuture:: get_result() const { // This is thread safe, since _result may no longer be modified after the // state is changed to "done". - nassertr_always(done(), nullptr); + nassertr_always(_future_state.load(std::memory_order_acquire) >= FS_finished, nullptr); return _result; } @@ -77,10 +79,14 @@ INLINE void AsyncFuture:: get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const { // This is thread safe, since _result may no longer be modified after the // state is changed to "done". - nassertd(done()) { +#ifdef NDEBUG + patomic_thread_fence(std::memory_order_acquire); +#else + nassertd(_future_state.load(std::memory_order_acquire) >= FS_finished) { ptr = nullptr; ref_ptr = nullptr; } +#endif ptr = _result; ref_ptr = _result_ref.p(); } @@ -124,7 +130,7 @@ INLINE AsyncFuture *AsyncFuture:: gather(Futures futures) { if (futures.empty()) { AsyncFuture *fut = new AsyncFuture; - fut->_future_state = (AtomicAdjust::Integer)FS_finished; + fut->_future_state.store(FS_finished, std::memory_order_relaxed); return fut; } else if (futures.size() == 1) { return futures[0].p(); @@ -167,10 +173,18 @@ try_lock_pending() { INLINE void AsyncFuture:: unlock(FutureState new_state) { nassertv(new_state != FS_locked_pending); - FutureState orig_state = (FutureState)AtomicAdjust::set(_future_state, (AtomicAdjust::Integer)new_state); + FutureState orig_state = (FutureState)_future_state.exchange(new_state, std::memory_order_release); nassertv(orig_state == FS_locked_pending); } +/** + * Atomically returns the current state. + */ +INLINE AsyncFuture::FutureState AsyncFuture:: +get_future_state() const { + return (FutureState)_future_state.load(std::memory_order_relaxed); +} + /** * Atomically changes the future state from pending to another state. Returns * true if successful, false if the future was already done. @@ -179,19 +193,18 @@ unlock(FutureState new_state) { */ INLINE bool AsyncFuture:: set_future_state(FutureState state) { - FutureState orig_state = (FutureState) - AtomicAdjust::compare_and_exchange( - _future_state, - (AtomicAdjust::Integer)FS_pending, - (AtomicAdjust::Integer)state); + patomic_unsigned_lock_free::value_type orig_state = FS_pending; + if (_future_state.compare_exchange_strong(orig_state, state, + std::memory_order_relaxed)) { + return true; + } #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) while (orig_state == FS_locked_pending) { - Thread::force_yield(); - orig_state = (FutureState)AtomicAdjust::compare_and_exchange( - _future_state, - (AtomicAdjust::Integer)FS_pending, - (AtomicAdjust::Integer)state); + Thread::relax(); + orig_state = FS_pending; + _future_state.compare_exchange_weak(orig_state, state, + std::memory_order_relaxed); } #else nassertr(orig_state != FS_locked_pending, false); diff --git a/panda/src/event/asyncFuture.cxx b/panda/src/event/asyncFuture.cxx index 45bcc0e392..d8b8b91d97 100644 --- a/panda/src/event/asyncFuture.cxx +++ b/panda/src/event/asyncFuture.cxx @@ -17,6 +17,7 @@ #include "config_event.h" #include "pStatTimer.h" #include "throw_event.h" +#include "trueClock.h" TypeHandle AsyncFuture::_type_handle; TypeHandle AsyncGatheringFuture::_type_handle; @@ -51,6 +52,11 @@ AsyncFuture:: * future was already done. Either way, done() will return true after this * call returns. * + * Please note that calling this is not a guarantee that the operation + * corresponding this future does not run. It could already be in the process + * of running, or perhaps not respond to a cancel signal. All this guarantees + * is that the future is marked as done when this call returns. + * * In the case of a task, this is equivalent to remove(). */ bool AsyncFuture:: @@ -71,8 +77,7 @@ cancel() { void AsyncFuture:: output(std::ostream &out) const { out << get_type(); - FutureState state = (FutureState)AtomicAdjust::get(_future_state); - switch (state) { + switch (get_future_state()) { case FS_pending: case FS_locked_pending: out << " (pending)"; @@ -94,47 +99,66 @@ output(std::ostream &out) const { */ void AsyncFuture:: wait() { - if (done()) { - return; + if (!done()) { + PStatTimer timer(AsyncTaskChain::_wait_pcollector); + if (task_cat.is_debug()) { + task_cat.debug() + << "Waiting for future " << *this << "\n"; + } + + FutureState state = get_future_state(); + while (state < FS_finished) { + if (state == FS_locked_pending) { + // If it's locked, someone is probably about to finish it, so don't + // go to sleep. + do { + Thread::relax(); + state = get_future_state(); + } + while (state == FS_locked_pending); + + if (state >= FS_finished) { + return; + } + } + + // Go to sleep. + _future_state.wait(state, std::memory_order_relaxed); + state = get_future_state(); + } } - PStatTimer timer(AsyncTaskChain::_wait_pcollector); - if (task_cat.is_debug()) { - task_cat.debug() - << "Waiting for future " << *this << "\n"; - } - - // Continue to yield while the future isn't done. It may be more efficient - // to use a condition variable, but let's not add the extra complexity - // unless we're sure that we need it. - do { - Thread::force_yield(); - } while (!done()); + // Let's make wait() an acquire op, so that people can use a future for + // synchronization. + patomic_thread_fence(std::memory_order_acquire); } /** - * Waits until the future is done, or until the timeout is reached. + * Waits until the future is done, or until the timeout is reached. Note that + * this can be considerably less efficient than wait() without a timeout, so + * it's generally not a good idea to use this unless you really need to. */ void AsyncFuture:: wait(double timeout) { - if (done()) { - return; - } + if (!done()) { + PStatTimer timer(AsyncTaskChain::_wait_pcollector); + if (task_cat.is_debug()) { + task_cat.debug() + << "Waiting up to " << timeout << " seconds for future " << *this << "\n"; + } - PStatTimer timer(AsyncTaskChain::_wait_pcollector); - if (task_cat.is_debug()) { - task_cat.debug() - << "Waiting up to " << timeout << " seconds for future " << *this << "\n"; - } + // Continue to yield while the future isn't done. It may be more efficient + // to use a condition variable, but let's not add the extra complexity + // unless we're sure that we need it. + TrueClock *clock = TrueClock::get_global_ptr(); + double end = clock->get_short_time() + timeout; + do { + Thread::relax(); + } + while (!done() && clock->get_short_time() < end); - // Continue to yield while the future isn't done. It may be more efficient - // to use a condition variable, but let's not add the extra complexity - // unless we're sure that we need it. - ClockObject *clock = ClockObject::get_global_clock(); - double end = clock->get_real_time() + timeout; - do { - Thread::force_yield(); - } while (!done() && clock->get_real_time() < end); + patomic_thread_fence(std::memory_order_acquire); + } } /** @@ -144,8 +168,12 @@ wait(double timeout) { */ void AsyncFuture:: notify_done(bool clean_exit) { + patomic_thread_fence(std::memory_order_acquire); nassertv(done()); + // Let any calls to wait() know that we're done. + _future_state.notify_all(); + // This will only be called by the thread that managed to set the // _future_state away from the "pending" state, so this is thread safe. @@ -159,7 +187,7 @@ notify_done(bool clean_exit) { // It's a gathering future. Decrease the pending count on it, and if // we're the last one, call notify_done() on it. AsyncGatheringFuture *gather = (AsyncGatheringFuture *)fut; - if (!AtomicAdjust::dec(gather->_num_pending)) { + if (gather->_num_pending.fetch_sub(1, std::memory_order_relaxed) == 1) { if (gather->set_future_state(FS_finished)) { gather->notify_done(true); } @@ -223,22 +251,22 @@ void AsyncFuture:: set_result(TypedObject *ptr, ReferenceCount *ref_ptr) { // We don't strictly need to lock the future since only one thread is // allowed to call set_result(), but we might as well. - FutureState orig_state = (FutureState)AtomicAdjust:: - compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending, - (AtomicAdjust::Integer)FS_locked_pending); - + patomic_unsigned_lock_free::value_type orig_state = FS_pending; + if (!_future_state.compare_exchange_strong(orig_state, FS_locked_pending, + std::memory_order_relaxed)) { #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) - while (orig_state == FS_locked_pending) { - Thread::force_yield(); - orig_state = (FutureState)AtomicAdjust:: - compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending, - (AtomicAdjust::Integer)FS_locked_pending); - } + while (orig_state == FS_locked_pending) { + Thread::relax(); + orig_state = FS_pending; + _future_state.compare_exchange_weak(orig_state, FS_locked_pending, + std::memory_order_relaxed); + } #else - // We can't lose control between now and calling unlock() if we're using a - // cooperative threading model. - nassertv(orig_state != FS_locked_pending); + // We can't lose control between now and calling unlock() if we're using a + // cooperative threading model. + nassertv(false); #endif + } if (orig_state == FS_pending) { _result = ptr; @@ -247,15 +275,15 @@ set_result(TypedObject *ptr, ReferenceCount *ref_ptr) { // OK, now our thread owns the _waiting vector et al. notify_done(true); - - } else if (orig_state == FS_cancelled) { + } + else if (orig_state == FS_cancelled) { // This was originally illegal, but there is a chance that the future was // cancelled while another thread was setting the result. So, we drop // this, but we can issue a warning. task_cat.warning() << "Ignoring set_result() called on cancelled " << *this << "\n"; - - } else { + } + else { task_cat.error() << "set_result() was called on finished " << *this << "\n"; } @@ -371,9 +399,7 @@ AsyncGatheringFuture(AsyncFuture::Futures futures) : bool any_pending = false; - AsyncFuture::Futures::const_iterator it; - for (it = _futures.begin(); it != _futures.end(); ++it) { - AsyncFuture *fut = *it; + for (AsyncFuture *fut : _futures) { // If this returns true, the future is not yet done and we need to // register ourselves with it. This creates a circular reference, but it // is resolved when the future is completed or cancelled. @@ -382,7 +408,7 @@ AsyncGatheringFuture(AsyncFuture::Futures futures) : _manager = fut->_manager; } fut->_waiting.push_back((AsyncFuture *)this); - AtomicAdjust::inc(_num_pending); + _num_pending.fetch_add(1, std::memory_order_relaxed); fut->unlock(); any_pending = true; } @@ -391,7 +417,7 @@ AsyncGatheringFuture(AsyncFuture::Futures futures) : // Start in the done state if all the futures we were passed are done. // Note that it is only safe to set this member in this manner if indeed // no other future holds a reference to us. - _future_state = (AtomicAdjust::Integer)FS_finished; + _future_state.store(FS_finished, std::memory_order_relaxed); } } @@ -411,7 +437,7 @@ cancel() { // Temporarily increase the pending count so that the notify_done() // callbacks won't end up causing it to be set to "finished". - AtomicAdjust::inc(_num_pending); + _num_pending.fetch_add(1, std::memory_order_relaxed); bool any_cancelled = false; for (AsyncFuture *fut : _futures) { @@ -422,7 +448,7 @@ cancel() { // If all the futures were cancelled, change state of this future to // "cancelled" and call the notify_done() callbacks. - if (!AtomicAdjust::dec(_num_pending)) { + if (_num_pending.fetch_sub(1, std::memory_order_relaxed) == 1) { if (set_future_state(FS_cancelled)) { notify_done(false); } diff --git a/panda/src/event/asyncFuture.h b/panda/src/event/asyncFuture.h index 73abe9a3a3..20c42ce4d4 100644 --- a/panda/src/event/asyncFuture.h +++ b/panda/src/event/asyncFuture.h @@ -18,7 +18,7 @@ #include "typedReferenceCount.h" #include "typedWritableReferenceCount.h" #include "eventParameter.h" -#include "atomicAdjust.h" +#include "patomic.h" class AsyncTaskManager; class AsyncTask; @@ -110,7 +110,7 @@ private: void wake_task(AsyncTask *task); protected: - enum FutureState { + enum FutureState : patomic_unsigned_lock_free::value_type { // Pending states FS_pending, FS_locked_pending, @@ -121,12 +121,13 @@ protected: }; INLINE bool try_lock_pending(); INLINE void unlock(FutureState new_state = FS_pending); + INLINE FutureState get_future_state() const; INLINE bool set_future_state(FutureState state); AsyncTaskManager *_manager; TypedObject *_result; PT(ReferenceCount) _result_ref; - AtomicAdjust::Integer _future_state; + patomic_unsigned_lock_free _future_state; std::string _done_event; @@ -176,7 +177,7 @@ public: private: const Futures _futures; - AtomicAdjust::Integer _num_pending; + patomic _num_pending; friend class AsyncFuture; diff --git a/panda/src/gobj/animateVerticesRequest.I b/panda/src/gobj/animateVerticesRequest.I index eb6ee589d0..f278f9c6d3 100644 --- a/panda/src/gobj/animateVerticesRequest.I +++ b/panda/src/gobj/animateVerticesRequest.I @@ -27,5 +27,5 @@ AnimateVerticesRequest(GeomVertexData *geom_vertex_data) : */ INLINE bool AnimateVerticesRequest:: is_ready() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; + return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished; } diff --git a/panda/src/gobj/textureReloadRequest.I b/panda/src/gobj/textureReloadRequest.I index 1b958ac6ef..05ac192cfa 100644 --- a/panda/src/gobj/textureReloadRequest.I +++ b/panda/src/gobj/textureReloadRequest.I @@ -62,5 +62,5 @@ get_allow_compressed() const { */ INLINE bool TextureReloadRequest:: is_ready() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; + return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished; } diff --git a/panda/src/pgraph/modelFlattenRequest.I b/panda/src/pgraph/modelFlattenRequest.I index 75f177e6a8..d63ede05e4 100644 --- a/panda/src/pgraph/modelFlattenRequest.I +++ b/panda/src/pgraph/modelFlattenRequest.I @@ -39,7 +39,7 @@ get_orig() const { */ INLINE bool ModelFlattenRequest:: is_ready() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; + return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished; } /** diff --git a/panda/src/pgraph/modelLoadRequest.I b/panda/src/pgraph/modelLoadRequest.I index 3fb37a1bdc..0f01d01e08 100644 --- a/panda/src/pgraph/modelLoadRequest.I +++ b/panda/src/pgraph/modelLoadRequest.I @@ -46,7 +46,7 @@ get_loader() const { */ INLINE bool ModelLoadRequest:: is_ready() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; + return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished; } /** diff --git a/panda/src/pgraph/modelSaveRequest.I b/panda/src/pgraph/modelSaveRequest.I index e84b8929af..524f09f956 100644 --- a/panda/src/pgraph/modelSaveRequest.I +++ b/panda/src/pgraph/modelSaveRequest.I @@ -54,7 +54,7 @@ get_loader() const { */ INLINE bool ModelSaveRequest:: is_ready() const { - return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; + return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished; } /**