event: Update AsyncFuture to use new atomics implementation

With explicit barriers, and the non-timeout version of wait() is now significantly more efficient by using the new futexes if available
This commit is contained in:
rdb 2022-02-23 21:25:43 +01:00
parent 70c49a6416
commit cb8563acac
9 changed files with 125 additions and 86 deletions

View File

@ -59,7 +59,7 @@ get_positional() const {
*/ */
INLINE bool AudioLoadRequest:: INLINE bool AudioLoadRequest::
is_ready() const { is_ready() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished;
} }
/** /**
@ -70,6 +70,5 @@ is_ready() const {
*/ */
INLINE AudioSound *AudioLoadRequest:: INLINE AudioSound *AudioLoadRequest::
get_sound() const { get_sound() const {
nassertr_always(done(), nullptr); return (AudioSound *)get_result();
return (AudioSound *)_result;
} }

View File

@ -27,7 +27,9 @@ AsyncFuture() :
*/ */
INLINE bool AsyncFuture:: INLINE bool AsyncFuture::
done() const { done() const {
return (FutureState)AtomicAdjust::get(_future_state) >= FS_finished; // Not an acquire barrier because the caller may not even care about the
// result. Instead, a fence is put in get_result().
return _future_state.load(std::memory_order_relaxed) >= FS_finished;
} }
/** /**
@ -35,7 +37,7 @@ done() const {
*/ */
INLINE bool AsyncFuture:: INLINE bool AsyncFuture::
cancelled() const { cancelled() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_cancelled; return _future_state.load(std::memory_order_relaxed) == FS_cancelled;
} }
/** /**
@ -65,7 +67,7 @@ INLINE TypedObject *AsyncFuture::
get_result() const { get_result() const {
// This is thread safe, since _result may no longer be modified after the // This is thread safe, since _result may no longer be modified after the
// state is changed to "done". // state is changed to "done".
nassertr_always(done(), nullptr); nassertr_always(_future_state.load(std::memory_order_acquire) >= FS_finished, nullptr);
return _result; return _result;
} }
@ -77,10 +79,14 @@ INLINE void AsyncFuture::
get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const { get_result(TypedObject *&ptr, ReferenceCount *&ref_ptr) const {
// This is thread safe, since _result may no longer be modified after the // This is thread safe, since _result may no longer be modified after the
// state is changed to "done". // state is changed to "done".
nassertd(done()) { #ifdef NDEBUG
patomic_thread_fence(std::memory_order_acquire);
#else
nassertd(_future_state.load(std::memory_order_acquire) >= FS_finished) {
ptr = nullptr; ptr = nullptr;
ref_ptr = nullptr; ref_ptr = nullptr;
} }
#endif
ptr = _result; ptr = _result;
ref_ptr = _result_ref.p(); ref_ptr = _result_ref.p();
} }
@ -124,7 +130,7 @@ INLINE AsyncFuture *AsyncFuture::
gather(Futures futures) { gather(Futures futures) {
if (futures.empty()) { if (futures.empty()) {
AsyncFuture *fut = new AsyncFuture; AsyncFuture *fut = new AsyncFuture;
fut->_future_state = (AtomicAdjust::Integer)FS_finished; fut->_future_state.store(FS_finished, std::memory_order_relaxed);
return fut; return fut;
} else if (futures.size() == 1) { } else if (futures.size() == 1) {
return futures[0].p(); return futures[0].p();
@ -167,10 +173,18 @@ try_lock_pending() {
INLINE void AsyncFuture:: INLINE void AsyncFuture::
unlock(FutureState new_state) { unlock(FutureState new_state) {
nassertv(new_state != FS_locked_pending); nassertv(new_state != FS_locked_pending);
FutureState orig_state = (FutureState)AtomicAdjust::set(_future_state, (AtomicAdjust::Integer)new_state); FutureState orig_state = (FutureState)_future_state.exchange(new_state, std::memory_order_release);
nassertv(orig_state == FS_locked_pending); nassertv(orig_state == FS_locked_pending);
} }
/**
* Atomically returns the current state.
*/
INLINE AsyncFuture::FutureState AsyncFuture::
get_future_state() const {
return (FutureState)_future_state.load(std::memory_order_relaxed);
}
/** /**
* Atomically changes the future state from pending to another state. Returns * Atomically changes the future state from pending to another state. Returns
* true if successful, false if the future was already done. * true if successful, false if the future was already done.
@ -179,19 +193,18 @@ unlock(FutureState new_state) {
*/ */
INLINE bool AsyncFuture:: INLINE bool AsyncFuture::
set_future_state(FutureState state) { set_future_state(FutureState state) {
FutureState orig_state = (FutureState) patomic_unsigned_lock_free::value_type orig_state = FS_pending;
AtomicAdjust::compare_and_exchange( if (_future_state.compare_exchange_strong(orig_state, state,
_future_state, std::memory_order_relaxed)) {
(AtomicAdjust::Integer)FS_pending, return true;
(AtomicAdjust::Integer)state); }
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
while (orig_state == FS_locked_pending) { while (orig_state == FS_locked_pending) {
Thread::force_yield(); Thread::relax();
orig_state = (FutureState)AtomicAdjust::compare_and_exchange( orig_state = FS_pending;
_future_state, _future_state.compare_exchange_weak(orig_state, state,
(AtomicAdjust::Integer)FS_pending, std::memory_order_relaxed);
(AtomicAdjust::Integer)state);
} }
#else #else
nassertr(orig_state != FS_locked_pending, false); nassertr(orig_state != FS_locked_pending, false);

View File

@ -17,6 +17,7 @@
#include "config_event.h" #include "config_event.h"
#include "pStatTimer.h" #include "pStatTimer.h"
#include "throw_event.h" #include "throw_event.h"
#include "trueClock.h"
TypeHandle AsyncFuture::_type_handle; TypeHandle AsyncFuture::_type_handle;
TypeHandle AsyncGatheringFuture::_type_handle; TypeHandle AsyncGatheringFuture::_type_handle;
@ -51,6 +52,11 @@ AsyncFuture::
* future was already done. Either way, done() will return true after this * future was already done. Either way, done() will return true after this
* call returns. * call returns.
* *
* Please note that calling this is not a guarantee that the operation
* corresponding this future does not run. It could already be in the process
* of running, or perhaps not respond to a cancel signal. All this guarantees
* is that the future is marked as done when this call returns.
*
* In the case of a task, this is equivalent to remove(). * In the case of a task, this is equivalent to remove().
*/ */
bool AsyncFuture:: bool AsyncFuture::
@ -71,8 +77,7 @@ cancel() {
void AsyncFuture:: void AsyncFuture::
output(std::ostream &out) const { output(std::ostream &out) const {
out << get_type(); out << get_type();
FutureState state = (FutureState)AtomicAdjust::get(_future_state); switch (get_future_state()) {
switch (state) {
case FS_pending: case FS_pending:
case FS_locked_pending: case FS_locked_pending:
out << " (pending)"; out << " (pending)";
@ -94,33 +99,48 @@ output(std::ostream &out) const {
*/ */
void AsyncFuture:: void AsyncFuture::
wait() { wait() {
if (done()) { if (!done()) {
return;
}
PStatTimer timer(AsyncTaskChain::_wait_pcollector); PStatTimer timer(AsyncTaskChain::_wait_pcollector);
if (task_cat.is_debug()) { if (task_cat.is_debug()) {
task_cat.debug() task_cat.debug()
<< "Waiting for future " << *this << "\n"; << "Waiting for future " << *this << "\n";
} }
// Continue to yield while the future isn't done. It may be more efficient FutureState state = get_future_state();
// to use a condition variable, but let's not add the extra complexity while (state < FS_finished) {
// unless we're sure that we need it. if (state == FS_locked_pending) {
// If it's locked, someone is probably about to finish it, so don't
// go to sleep.
do { do {
Thread::force_yield(); Thread::relax();
} while (!done()); state = get_future_state();
}
while (state == FS_locked_pending);
if (state >= FS_finished) {
return;
}
}
// Go to sleep.
_future_state.wait(state, std::memory_order_relaxed);
state = get_future_state();
}
}
// Let's make wait() an acquire op, so that people can use a future for
// synchronization.
patomic_thread_fence(std::memory_order_acquire);
} }
/** /**
* Waits until the future is done, or until the timeout is reached. * Waits until the future is done, or until the timeout is reached. Note that
* this can be considerably less efficient than wait() without a timeout, so
* it's generally not a good idea to use this unless you really need to.
*/ */
void AsyncFuture:: void AsyncFuture::
wait(double timeout) { wait(double timeout) {
if (done()) { if (!done()) {
return;
}
PStatTimer timer(AsyncTaskChain::_wait_pcollector); PStatTimer timer(AsyncTaskChain::_wait_pcollector);
if (task_cat.is_debug()) { if (task_cat.is_debug()) {
task_cat.debug() task_cat.debug()
@ -130,11 +150,15 @@ wait(double timeout) {
// Continue to yield while the future isn't done. It may be more efficient // Continue to yield while the future isn't done. It may be more efficient
// to use a condition variable, but let's not add the extra complexity // to use a condition variable, but let's not add the extra complexity
// unless we're sure that we need it. // unless we're sure that we need it.
ClockObject *clock = ClockObject::get_global_clock(); TrueClock *clock = TrueClock::get_global_ptr();
double end = clock->get_real_time() + timeout; double end = clock->get_short_time() + timeout;
do { do {
Thread::force_yield(); Thread::relax();
} while (!done() && clock->get_real_time() < end); }
while (!done() && clock->get_short_time() < end);
patomic_thread_fence(std::memory_order_acquire);
}
} }
/** /**
@ -144,8 +168,12 @@ wait(double timeout) {
*/ */
void AsyncFuture:: void AsyncFuture::
notify_done(bool clean_exit) { notify_done(bool clean_exit) {
patomic_thread_fence(std::memory_order_acquire);
nassertv(done()); nassertv(done());
// Let any calls to wait() know that we're done.
_future_state.notify_all();
// This will only be called by the thread that managed to set the // This will only be called by the thread that managed to set the
// _future_state away from the "pending" state, so this is thread safe. // _future_state away from the "pending" state, so this is thread safe.
@ -159,7 +187,7 @@ notify_done(bool clean_exit) {
// It's a gathering future. Decrease the pending count on it, and if // It's a gathering future. Decrease the pending count on it, and if
// we're the last one, call notify_done() on it. // we're the last one, call notify_done() on it.
AsyncGatheringFuture *gather = (AsyncGatheringFuture *)fut; AsyncGatheringFuture *gather = (AsyncGatheringFuture *)fut;
if (!AtomicAdjust::dec(gather->_num_pending)) { if (gather->_num_pending.fetch_sub(1, std::memory_order_relaxed) == 1) {
if (gather->set_future_state(FS_finished)) { if (gather->set_future_state(FS_finished)) {
gather->notify_done(true); gather->notify_done(true);
} }
@ -223,22 +251,22 @@ void AsyncFuture::
set_result(TypedObject *ptr, ReferenceCount *ref_ptr) { set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
// We don't strictly need to lock the future since only one thread is // We don't strictly need to lock the future since only one thread is
// allowed to call set_result(), but we might as well. // allowed to call set_result(), but we might as well.
FutureState orig_state = (FutureState)AtomicAdjust:: patomic_unsigned_lock_free::value_type orig_state = FS_pending;
compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending, if (!_future_state.compare_exchange_strong(orig_state, FS_locked_pending,
(AtomicAdjust::Integer)FS_locked_pending); std::memory_order_relaxed)) {
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) #if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
while (orig_state == FS_locked_pending) { while (orig_state == FS_locked_pending) {
Thread::force_yield(); Thread::relax();
orig_state = (FutureState)AtomicAdjust:: orig_state = FS_pending;
compare_and_exchange(_future_state, (AtomicAdjust::Integer)FS_pending, _future_state.compare_exchange_weak(orig_state, FS_locked_pending,
(AtomicAdjust::Integer)FS_locked_pending); std::memory_order_relaxed);
} }
#else #else
// We can't lose control between now and calling unlock() if we're using a // We can't lose control between now and calling unlock() if we're using a
// cooperative threading model. // cooperative threading model.
nassertv(orig_state != FS_locked_pending); nassertv(false);
#endif #endif
}
if (orig_state == FS_pending) { if (orig_state == FS_pending) {
_result = ptr; _result = ptr;
@ -247,15 +275,15 @@ set_result(TypedObject *ptr, ReferenceCount *ref_ptr) {
// OK, now our thread owns the _waiting vector et al. // OK, now our thread owns the _waiting vector et al.
notify_done(true); notify_done(true);
}
} else if (orig_state == FS_cancelled) { else if (orig_state == FS_cancelled) {
// This was originally illegal, but there is a chance that the future was // This was originally illegal, but there is a chance that the future was
// cancelled while another thread was setting the result. So, we drop // cancelled while another thread was setting the result. So, we drop
// this, but we can issue a warning. // this, but we can issue a warning.
task_cat.warning() task_cat.warning()
<< "Ignoring set_result() called on cancelled " << *this << "\n"; << "Ignoring set_result() called on cancelled " << *this << "\n";
}
} else { else {
task_cat.error() task_cat.error()
<< "set_result() was called on finished " << *this << "\n"; << "set_result() was called on finished " << *this << "\n";
} }
@ -371,9 +399,7 @@ AsyncGatheringFuture(AsyncFuture::Futures futures) :
bool any_pending = false; bool any_pending = false;
AsyncFuture::Futures::const_iterator it; for (AsyncFuture *fut : _futures) {
for (it = _futures.begin(); it != _futures.end(); ++it) {
AsyncFuture *fut = *it;
// If this returns true, the future is not yet done and we need to // If this returns true, the future is not yet done and we need to
// register ourselves with it. This creates a circular reference, but it // register ourselves with it. This creates a circular reference, but it
// is resolved when the future is completed or cancelled. // is resolved when the future is completed or cancelled.
@ -382,7 +408,7 @@ AsyncGatheringFuture(AsyncFuture::Futures futures) :
_manager = fut->_manager; _manager = fut->_manager;
} }
fut->_waiting.push_back((AsyncFuture *)this); fut->_waiting.push_back((AsyncFuture *)this);
AtomicAdjust::inc(_num_pending); _num_pending.fetch_add(1, std::memory_order_relaxed);
fut->unlock(); fut->unlock();
any_pending = true; any_pending = true;
} }
@ -391,7 +417,7 @@ AsyncGatheringFuture(AsyncFuture::Futures futures) :
// Start in the done state if all the futures we were passed are done. // Start in the done state if all the futures we were passed are done.
// Note that it is only safe to set this member in this manner if indeed // Note that it is only safe to set this member in this manner if indeed
// no other future holds a reference to us. // no other future holds a reference to us.
_future_state = (AtomicAdjust::Integer)FS_finished; _future_state.store(FS_finished, std::memory_order_relaxed);
} }
} }
@ -411,7 +437,7 @@ cancel() {
// Temporarily increase the pending count so that the notify_done() // Temporarily increase the pending count so that the notify_done()
// callbacks won't end up causing it to be set to "finished". // callbacks won't end up causing it to be set to "finished".
AtomicAdjust::inc(_num_pending); _num_pending.fetch_add(1, std::memory_order_relaxed);
bool any_cancelled = false; bool any_cancelled = false;
for (AsyncFuture *fut : _futures) { for (AsyncFuture *fut : _futures) {
@ -422,7 +448,7 @@ cancel() {
// If all the futures were cancelled, change state of this future to // If all the futures were cancelled, change state of this future to
// "cancelled" and call the notify_done() callbacks. // "cancelled" and call the notify_done() callbacks.
if (!AtomicAdjust::dec(_num_pending)) { if (_num_pending.fetch_sub(1, std::memory_order_relaxed) == 1) {
if (set_future_state(FS_cancelled)) { if (set_future_state(FS_cancelled)) {
notify_done(false); notify_done(false);
} }

View File

@ -18,7 +18,7 @@
#include "typedReferenceCount.h" #include "typedReferenceCount.h"
#include "typedWritableReferenceCount.h" #include "typedWritableReferenceCount.h"
#include "eventParameter.h" #include "eventParameter.h"
#include "atomicAdjust.h" #include "patomic.h"
class AsyncTaskManager; class AsyncTaskManager;
class AsyncTask; class AsyncTask;
@ -110,7 +110,7 @@ private:
void wake_task(AsyncTask *task); void wake_task(AsyncTask *task);
protected: protected:
enum FutureState { enum FutureState : patomic_unsigned_lock_free::value_type {
// Pending states // Pending states
FS_pending, FS_pending,
FS_locked_pending, FS_locked_pending,
@ -121,12 +121,13 @@ protected:
}; };
INLINE bool try_lock_pending(); INLINE bool try_lock_pending();
INLINE void unlock(FutureState new_state = FS_pending); INLINE void unlock(FutureState new_state = FS_pending);
INLINE FutureState get_future_state() const;
INLINE bool set_future_state(FutureState state); INLINE bool set_future_state(FutureState state);
AsyncTaskManager *_manager; AsyncTaskManager *_manager;
TypedObject *_result; TypedObject *_result;
PT(ReferenceCount) _result_ref; PT(ReferenceCount) _result_ref;
AtomicAdjust::Integer _future_state; patomic_unsigned_lock_free _future_state;
std::string _done_event; std::string _done_event;
@ -176,7 +177,7 @@ public:
private: private:
const Futures _futures; const Futures _futures;
AtomicAdjust::Integer _num_pending; patomic<size_t> _num_pending;
friend class AsyncFuture; friend class AsyncFuture;

View File

@ -27,5 +27,5 @@ AnimateVerticesRequest(GeomVertexData *geom_vertex_data) :
*/ */
INLINE bool AnimateVerticesRequest:: INLINE bool AnimateVerticesRequest::
is_ready() const { is_ready() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished;
} }

View File

@ -62,5 +62,5 @@ get_allow_compressed() const {
*/ */
INLINE bool TextureReloadRequest:: INLINE bool TextureReloadRequest::
is_ready() const { is_ready() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished;
} }

View File

@ -39,7 +39,7 @@ get_orig() const {
*/ */
INLINE bool ModelFlattenRequest:: INLINE bool ModelFlattenRequest::
is_ready() const { is_ready() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished;
} }
/** /**

View File

@ -46,7 +46,7 @@ get_loader() const {
*/ */
INLINE bool ModelLoadRequest:: INLINE bool ModelLoadRequest::
is_ready() const { is_ready() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished;
} }
/** /**

View File

@ -54,7 +54,7 @@ get_loader() const {
*/ */
INLINE bool ModelSaveRequest:: INLINE bool ModelSaveRequest::
is_ready() const { is_ready() const {
return (FutureState)AtomicAdjust::get(_future_state) == FS_finished; return (FutureState)_future_state.load(std::memory_order_relaxed) == FS_finished;
} }
/** /**