friendlier interface for task management

This commit is contained in:
David Rose 2008-10-14 02:17:38 +00:00
parent 732b4f0067
commit 02d98e41c7
17 changed files with 163 additions and 120 deletions

View File

@ -95,25 +95,6 @@ get_delay() const {
return _delay;
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTask::get_wake_time
// Access: Published
// Description: If this task has been added to an AsyncTaskManager
// with a delay in effect, this returns the time at
// which the task is expected to awaken. It has no
// meaning if the task has not yet been added to a
// queue, or if there was no delay in effect at the time
// the task was added.
//
// It is only valid to call this if the task's status is
// S_sleeping.
////////////////////////////////////////////////////////////////////
INLINE double AsyncTask::
get_wake_time() const {
nassertr(_state == S_sleeping, 0.0);
return _wake_time;
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTask::get_start_time
// Access: Published

View File

@ -87,6 +87,33 @@ remove() {
}
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTask::get_wake_time
// Access: Published
// Description: If this task has been added to an AsyncTaskManager
// with a delay in effect, this returns the time at
// which the task is expected to awaken. It has no
// meaning if the task has not yet been added to a
// queue, or if there was no delay in effect at the time
// the task was added.
//
// If the task's status is not S_sleeping, this returns
// 0.0.
////////////////////////////////////////////////////////////////////
double AsyncTask::
get_wake_time() const {
if (_manager != (AsyncTaskManager *)NULL) {
MutexHolder holder(_manager->_lock);
if (_state == S_sleeping) {
return _wake_time;
}
}
// If it's not on any manager, or it's not sleeping, the wake time
// is 0.0.
return 0.0;
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTask::get_elapsed_time
// Access: Published

View File

@ -77,7 +77,7 @@ PUBLISHED:
INLINE void clear_delay();
INLINE bool has_delay() const;
INLINE double get_delay() const;
INLINE double get_wake_time() const;
double get_wake_time() const;
INLINE double get_start_time() const;
double get_elapsed_time() const;

View File

@ -25,3 +25,29 @@ INLINE bool AsyncTaskChain::
is_started() const {
return (_state == S_started);
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTaskChain::do_get_next_wake_time
// Access: Protected
// Description: Returns the time at which the next sleeping thread
// will awaken, or -1 if there are no sleeping threads.
// Assumes the lock is already held.
////////////////////////////////////////////////////////////////////
INLINE double AsyncTaskChain::
do_get_next_wake_time() const {
if (!_sleeping.empty()) {
return _sleeping.front()->_wake_time;
}
return -1.0;
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTaskChain::get_wake_time
// Access: Protected, Static
// Description: Returns the time at which the indicated thread
// will awaken. Assumes the lock is already held.
////////////////////////////////////////////////////////////////////
INLINE double AsyncTaskChain::
get_wake_time(AsyncTask *task) {
return task->_wake_time;
}

View File

@ -765,7 +765,7 @@ service_one_task(AsyncTaskChain::AsyncTaskChainThread *thread) {
if (task_cat.is_spam()) {
task_cat.spam()
<< "Sleeping " << *task << ", wake time at "
<< task->get_wake_time() - now << "\n";
<< task->_wake_time - now << "\n";
}
_cvar.notify_all();
}
@ -927,12 +927,12 @@ finish_sort_group() {
// Check for any sleeping tasks that need to be woken.
double now = _manager->_clock->get_frame_time();
while (!_sleeping.empty() && _sleeping.front()->get_wake_time() <= now) {
while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
PT(AsyncTask) task = _sleeping.front();
if (task_cat.is_spam()) {
task_cat.spam()
<< "Waking " << *task << ", wake time at "
<< task->get_wake_time() - now << "\n";
<< task->_wake_time - now << "\n";
}
pop_heap(_sleeping.begin(), _sleeping.end(), AsyncTaskSortWakeTime());
_sleeping.pop_back();
@ -948,7 +948,7 @@ finish_sort_group() {
} else {
task_cat.spam()
<< "Next sleeper: " << *_sleeping.front() << ", wake time at "
<< _sleeping.front()->get_wake_time() - now << "\n";
<< _sleeping.front()->_wake_time - now << "\n";
}
}
@ -1281,20 +1281,6 @@ cleanup_pickup_mode() {
}
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTaskChain::do_get_next_wake_time
// Access: Protected
// Description:
////////////////////////////////////////////////////////////////////
double AsyncTaskChain::
do_get_next_wake_time() const {
if (!_sleeping.empty()) {
PT(AsyncTask) task = _sleeping.front();
return task->_wake_time;
}
return -1.0;
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTaskChain::do_output
// Access: Protected
@ -1519,7 +1505,7 @@ thread_main() {
PStatTimer timer(_wait_pcollector);
_chain->_cvar.wait();
} else {
double wake_time = _chain->_sleeping.front()->get_wake_time();
double wake_time = _chain->do_get_next_wake_time();
double now = _chain->_manager->_clock->get_frame_time();
double timeout = max(wake_time - now, 0.0);
PStatTimer timer(_wait_pcollector);

View File

@ -117,7 +117,8 @@ protected:
AsyncTaskCollection do_get_sleeping_tasks() const;
void do_poll();
void cleanup_pickup_mode();
double do_get_next_wake_time() const;
INLINE double do_get_next_wake_time() const;
static INLINE double get_wake_time(AsyncTask *task);
void do_output(ostream &out) const;
void do_write(ostream &out, int indent_level) const;
@ -136,7 +137,7 @@ protected:
class AsyncTaskSortWakeTime {
public:
bool operator () (AsyncTask *a, AsyncTask *b) const {
return a->get_wake_time() > b->get_wake_time();
return AsyncTaskChain::get_wake_time(a) > AsyncTaskChain::get_wake_time(b);
}
};
@ -209,6 +210,7 @@ private:
friend class AsyncTaskChainThread;
friend class AsyncTask;
friend class AsyncTaskManager;
friend class AsyncTaskSortWakeTime;
};
INLINE ostream &operator << (ostream &out, const AsyncTaskChain &chain) {

View File

@ -234,6 +234,28 @@ get_task(int index) const {
return _tasks[index];
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTaskCollection::remove_task
// Access: Published
// Description: Removes the nth AsyncTask from the collection.
////////////////////////////////////////////////////////////////////
void AsyncTaskCollection::
remove_task(int index) {
// If the pointer to our internal array is shared by any other
// AsyncTaskCollections, we have to copy the array now so we won't
// inadvertently modify any of our brethren AsyncTaskCollection
// objects.
if (_tasks.get_ref_count() > 1) {
AsyncTasks old_tasks = _tasks;
_tasks = AsyncTasks::empty_array(0);
_tasks.v() = old_tasks.v();
}
nassertv(index >= 0 && index < (int)_tasks.size());
_tasks.erase(_tasks.begin() + index);
}
////////////////////////////////////////////////////////////////////
// Function: AsyncTaskCollection::operator []
// Access: Published

View File

@ -46,6 +46,7 @@ PUBLISHED:
int get_num_tasks() const;
AsyncTask *get_task(int index) const;
void remove_task(int index);
AsyncTask *operator [] (int index) const;
void output(ostream &out) const;

View File

@ -305,9 +305,6 @@ __getattr__(const string &attr_name) const {
} else if (attr_name == "name") {
return PyString_FromString(get_name().c_str());
} else if (attr_name == "wakeTime") {
if (get_state() != S_sleeping) {
Py_RETURN_NONE;
}
return PyFloat_FromDouble(get_wake_time());
} else if (attr_name == "delayTime") {
if (!has_delay()) {

View File

@ -74,11 +74,11 @@ void ConditionVarDebug::
wait() {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to wait on "
ostr << *current_thread << " attempted to wait on "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -87,25 +87,25 @@ wait() {
if (thread_cat->is_spam()) {
thread_cat.spam()
<< *this_thread << " waiting on " << *this << "\n";
<< *current_thread << " waiting on " << *this << "\n";
}
nassertd(this_thread->_waiting_on_cvar == NULL &&
this_thread->_waiting_on_cvar_full == NULL) {
nassertd(current_thread->_waiting_on_cvar == NULL &&
current_thread->_waiting_on_cvar_full == NULL) {
}
this_thread->_waiting_on_cvar = this;
current_thread->_waiting_on_cvar = this;
_mutex.do_release();
_impl.wait(); // temporarily releases _global_lock
_mutex.do_lock();
_mutex.do_acquire(current_thread);
nassertd(this_thread->_waiting_on_cvar == this) {
nassertd(current_thread->_waiting_on_cvar == this) {
}
this_thread->_waiting_on_cvar = NULL;
current_thread->_waiting_on_cvar = NULL;
if (thread_cat.is_spam()) {
thread_cat.spam()
<< *this_thread << " awake on " << *this << "\n";
<< *current_thread << " awake on " << *this << "\n";
}
_mutex._global_lock->release();
@ -115,7 +115,7 @@ wait() {
// Function: ConditionVarDebug::wait
// Access: Published
// Description: Waits on the condition, with a timeout. The function
// will return when the condition variable is signaled,
// will return when the condition variable is notified,
// or the timeout occurs. There is no way to directly
// tell which happened, and it is possible that neither
// in fact happened (spurious wakeups are possible).
@ -126,11 +126,11 @@ void ConditionVarDebug::
wait(double timeout) {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to wait on "
ostr << *current_thread << " attempted to wait on "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -139,33 +139,33 @@ wait(double timeout) {
if (thread_cat.is_spam()) {
thread_cat.spam()
<< *this_thread << " waiting on " << *this
<< *current_thread << " waiting on " << *this
<< ", with timeout " << timeout << "\n";
}
nassertd(this_thread->_waiting_on_cvar == NULL &&
this_thread->_waiting_on_cvar_full == NULL) {
nassertd(current_thread->_waiting_on_cvar == NULL &&
current_thread->_waiting_on_cvar_full == NULL) {
}
this_thread->_waiting_on_cvar = this;
current_thread->_waiting_on_cvar = this;
_mutex.do_release();
_impl.wait(timeout); // temporarily releases _global_lock
_mutex.do_lock();
_mutex.do_acquire(current_thread);
nassertd(this_thread->_waiting_on_cvar == this) {
nassertd(current_thread->_waiting_on_cvar == this) {
}
this_thread->_waiting_on_cvar = NULL;
current_thread->_waiting_on_cvar = NULL;
if (thread_cat.is_spam()) {
thread_cat.spam()
<< *this_thread << " awake on " << *this << "\n";
<< *current_thread << " awake on " << *this << "\n";
}
_mutex._global_lock->release();
}
////////////////////////////////////////////////////////////////////
// Function: ConditionVarDebug::signal
// Function: ConditionVarDebug::notify
// Access: Published
// Description: Informs one of the other threads who are currently
// blocked on wait() that the relevant condition has
@ -179,17 +179,17 @@ wait(double timeout) {
// will not release the mutex.
//
// If no threads are waiting, this is a no-op: the
// signal is lost.
// notify event is lost.
////////////////////////////////////////////////////////////////////
void ConditionVarDebug::
notify() {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to signal "
ostr << *current_thread << " attempted to notify "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -198,7 +198,7 @@ notify() {
if (thread_cat->is_spam()) {
thread_cat.spam()
<< *this_thread << " signalling " << *this << "\n";
<< *current_thread << " notifying " << *this << "\n";
}
_impl.notify();

View File

@ -108,7 +108,7 @@ wait() {
// Function: ConditionVarDirect::wait
// Access: Published
// Description: Waits on the condition, with a timeout. The function
// will return when the condition variable is signaled,
// will return when the condition variable is notified,
// or the timeout occurs. There is no way to directly
// tell which happened, and it is possible that neither
// in fact happened (spurious wakeups are possible).
@ -122,7 +122,7 @@ wait(double timeout) {
}
////////////////////////////////////////////////////////////////////
// Function: ConditionVarDirect::signal
// Function: ConditionVarDirect::notify
// Access: Public
// Description: Informs one of the other threads who are currently
// blocked on wait() that the relevant condition has
@ -136,7 +136,7 @@ wait(double timeout) {
// will not release the mutex.
//
// If no threads are waiting, this is a no-op: the
// signal is lost.
// notify event is lost.
////////////////////////////////////////////////////////////////////
INLINE void ConditionVarDirect::
notify() {

View File

@ -74,11 +74,11 @@ void ConditionVarFullDebug::
wait() {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to wait on "
ostr << *current_thread << " attempted to wait on "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -87,25 +87,25 @@ wait() {
if (thread_cat->is_spam()) {
thread_cat.spam()
<< *this_thread << " waiting on " << *this << "\n";
<< *current_thread << " waiting on " << *this << "\n";
}
nassertd(this_thread->_waiting_on_cvar == NULL &&
this_thread->_waiting_on_cvar_full == NULL) {
nassertd(current_thread->_waiting_on_cvar == NULL &&
current_thread->_waiting_on_cvar_full == NULL) {
}
this_thread->_waiting_on_cvar_full = this;
current_thread->_waiting_on_cvar_full = this;
_mutex.do_release();
_impl.wait(); // temporarily releases _global_lock
_mutex.do_lock();
_mutex.do_acquire(current_thread);
nassertd(this_thread->_waiting_on_cvar_full == this) {
nassertd(current_thread->_waiting_on_cvar_full == this) {
}
this_thread->_waiting_on_cvar_full = NULL;
current_thread->_waiting_on_cvar_full = NULL;
if (thread_cat.is_spam()) {
thread_cat.spam()
<< *this_thread << " awake on " << *this << "\n";
<< *current_thread << " awake on " << *this << "\n";
}
_mutex._global_lock->release();
@ -115,7 +115,7 @@ wait() {
// Function: ConditionVarFullDebug::wait
// Access: Published
// Description: Waits on the condition, with a timeout. The function
// will return when the condition variable is signaled,
// will return when the condition variable is notified,
// or the timeout occurs. There is no way to directly
// tell which happened, and it is possible that neither
// in fact happened (spurious wakeups are possible).
@ -126,11 +126,11 @@ void ConditionVarFullDebug::
wait(double timeout) {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to wait on "
ostr << *current_thread << " attempted to wait on "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -139,33 +139,33 @@ wait(double timeout) {
if (thread_cat.is_spam()) {
thread_cat.spam()
<< *this_thread << " waiting on " << *this
<< *current_thread << " waiting on " << *this
<< ", with timeout " << timeout << "\n";
}
nassertd(this_thread->_waiting_on_cvar == NULL &&
this_thread->_waiting_on_cvar_full == NULL) {
nassertd(current_thread->_waiting_on_cvar == NULL &&
current_thread->_waiting_on_cvar_full == NULL) {
}
this_thread->_waiting_on_cvar_full = this;
current_thread->_waiting_on_cvar_full = this;
_mutex.do_release();
_impl.wait(timeout); // temporarily releases _global_lock
_mutex.do_lock();
_mutex.do_acquire(current_thread);
nassertd(this_thread->_waiting_on_cvar_full == this) {
nassertd(current_thread->_waiting_on_cvar_full == this) {
}
this_thread->_waiting_on_cvar_full = NULL;
current_thread->_waiting_on_cvar_full = NULL;
if (thread_cat.is_spam()) {
thread_cat.spam()
<< *this_thread << " awake on " << *this << "\n";
<< *current_thread << " awake on " << *this << "\n";
}
_mutex._global_lock->release();
}
////////////////////////////////////////////////////////////////////
// Function: ConditionVarFullDebug::signal
// Function: ConditionVarFullDebug::notify
// Access: Published
// Description: Informs one of the other threads who are currently
// blocked on wait() that the relevant condition has
@ -179,17 +179,17 @@ wait(double timeout) {
// will not release the mutex.
//
// If no threads are waiting, this is a no-op: the
// signal is lost.
// notify event is lost.
////////////////////////////////////////////////////////////////////
void ConditionVarFullDebug::
notify() {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to signal "
ostr << *current_thread << " attempted to notify "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -198,7 +198,7 @@ notify() {
if (thread_cat->is_spam()) {
thread_cat.spam()
<< *this_thread << " signalling " << *this << "\n";
<< *current_thread << " notifying " << *this << "\n";
}
_impl.notify();
@ -206,7 +206,7 @@ notify() {
}
////////////////////////////////////////////////////////////////////
// Function: ConditionVarFullDebug::signal
// Function: ConditionVarFullDebug::notify
// Access: Published
// Description: Informs all of the other threads who are currently
// blocked on wait() that the relevant condition has
@ -217,17 +217,17 @@ notify() {
// will not release the mutex.
//
// If no threads are waiting, this is a no-op: the
// signal is lost.
// notify event is lost.
////////////////////////////////////////////////////////////////////
void ConditionVarFullDebug::
notify_all() {
_mutex._global_lock->acquire();
Thread *this_thread = Thread::get_current_thread();
Thread *current_thread = Thread::get_current_thread();
if (!_mutex.do_debug_is_locked()) {
ostringstream ostr;
ostr << *this_thread << " attempted to signal "
ostr << *current_thread << " attempted to notify "
<< *this << " without holding " << _mutex;
nassert_raise(ostr.str());
_mutex._global_lock->release();
@ -236,7 +236,7 @@ notify_all() {
if (thread_cat->is_spam()) {
thread_cat.spam()
<< *this_thread << " signalling all " << *this << "\n";
<< *current_thread << " notifying all " << *this << "\n";
}
_impl.notify_all();

View File

@ -108,7 +108,7 @@ wait() {
// Function: ConditionVarFullDirect::wait
// Access: Published
// Description: Waits on the condition, with a timeout. The function
// will return when the condition variable is signaled,
// will return when the condition variable is notified,
// or the timeout occurs. There is no way to directly
// tell which happened, and it is possible that neither
// in fact happened (spurious wakeups are possible).
@ -122,7 +122,7 @@ wait(double timeout) {
}
////////////////////////////////////////////////////////////////////
// Function: ConditionVarFullDirect::signal
// Function: ConditionVarFullDirect::notify
// Access: Published
// Description: Informs one of the other threads who are currently
// blocked on wait() that the relevant condition has
@ -136,7 +136,7 @@ wait(double timeout) {
// will not release the mutex.
//
// If no threads are waiting, this is a no-op: the
// signal is lost.
// notify is lost.
////////////////////////////////////////////////////////////////////
INLINE void ConditionVarFullDirect::
notify() {
@ -145,7 +145,7 @@ notify() {
}
////////////////////////////////////////////////////////////////////
// Function: ConditionVarFullDirect::signal_all
// Function: ConditionVarFullDirect::notify_all
// Access: Published
// Description: Informs all of the other threads who are currently
// blocked on wait() that the relevant condition has
@ -156,7 +156,7 @@ notify() {
// will not release the mutex.
//
// If no threads are waiting, this is a no-op: the
// signal is lost.
// notify event is lost.
////////////////////////////////////////////////////////////////////
INLINE void ConditionVarFullDirect::
notify_all() {

View File

@ -48,14 +48,13 @@ operator = (const MutexDebug &copy) {
//
// Also see MutexHolder.
////////////////////////////////////////////////////////////////////
INLINE bool MutexDebug::
INLINE void MutexDebug::
acquire(Thread *current_thread) const {
TAU_PROFILE("void MutexDebug::acquire(Thread *)", " ", TAU_USER);
nassertv(current_thread == Thread::get_current_thread());
_global_lock->acquire();
((MutexDebug *)this)->do_acquire();
_global_lock->release(current_thread);
return true;
((MutexDebug *)this)->do_acquire(current_thread);
_global_lock->release();
}
////////////////////////////////////////////////////////////////////
@ -68,7 +67,7 @@ acquire(Thread *current_thread) const {
INLINE bool MutexDebug::
try_acquire(Thread *current_thread) const {
TAU_PROFILE("void MutexDebug::acquire(Thread *)", " ", TAU_USER);
nassertv(current_thread == Thread::get_current_thread());
nassertr(current_thread == Thread::get_current_thread(), false);
_global_lock->acquire();
bool acquired = ((MutexDebug *)this)->do_try_acquire(current_thread);
_global_lock->release();

View File

@ -219,7 +219,7 @@ do_try_acquire(Thread *current_thread) {
pipeline_cat.error()
<< "Configure name-deleted-mutexes 1 to see the mutex name.\n";
}
return;
return false;
}
bool acquired = true;
@ -227,12 +227,12 @@ do_try_acquire(Thread *current_thread) {
// The mutex is not already locked by anyone. Lock it.
_locking_thread = current_thread;
++_lock_count;
nassertv(_lock_count == 1);
nassertr(_lock_count == 1, false);
} else if (_locking_thread == current_thread) {
// The mutex is already locked by this thread. Increment the lock
// count.
nassertv(_lock_count > 0);
nassertr(_lock_count > 0, false);
if (!_allow_recursion) {
ostringstream ostr;
ostr << *current_thread << " attempted to double-lock non-reentrant "

View File

@ -19,7 +19,7 @@
// Access: Published
// Description:
////////////////////////////////////////////////////////////////////
INLINE void Semaphore::
void Semaphore::
output(ostream &out) const {
MutexHolder holder(_lock);
out << "Semaphore, count = " << _count;

View File

@ -151,8 +151,10 @@ join() {
nassertv(_joinable);
if (_status == S_running) {
ThreadSimpleImpl *thread = _manager->get_current_thread();
_joining_threads.push_back(thread);
_manager->next_context();
if (thread != this) {
_joining_threads.push_back(thread);
_manager->next_context();
}
}
}