better handling of thread::force_yield(), not to impact other threads

This commit is contained in:
David Rose 2009-01-23 02:39:36 +00:00
parent 89d4957646
commit 7db0cd974c
2 changed files with 92 additions and 35 deletions

View File

@ -98,9 +98,17 @@ void ThreadSimpleManager::
enqueue_ready(ThreadSimpleImpl *thread, bool volunteer) {
// We actually add it to _next_ready, so that we can tell when we
// have processed every thread in a given epoch.
_next_ready.push_back(thread);
if (volunteer) {
++_num_next_ready_volunteers;
if (!volunteer) {
_next_ready.push_back(thread);
} else {
// Unless it's a volunteer, in which case we actually put it to
// sleep for the duration of the timeslice, so it won't interfere
// with timeslice accounting for the remaining ready threads.
double now = get_current_time();
thread->_wake_time = now + _simple_thread_epoch_timeslice;
_volunteers.push_back(thread);
push_heap(_volunteers.begin(), _volunteers.end(), CompareStartTime());
}
}
@ -314,6 +322,7 @@ prepare_for_exit() {
}
kill_non_joinable(_sleeping);
kill_non_joinable(_volunteers);
next_context();
@ -393,20 +402,24 @@ write_status(ostream &out) const {
out << "Ready:";
FifoThreads::const_iterator ti;
Sleeping::const_iterator si;
for (ti = _ready.begin(); ti != _ready.end(); ++ti) {
out << " " << *(*ti)->_parent_obj;
}
for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) {
out << " " << *(*ti)->_parent_obj;
}
for (si = _volunteers.begin(); si != _volunteers.end(); ++si) {
out << " " << *(*si)->_parent_obj;
}
out << "\n";
double now = get_current_time();
out << "Sleeping:";
// Copy and sort for convenience.
Sleeping s2 = _sleeping;
sort(s2.begin(), s2.end(), CompareStartTime());
Sleeping::const_iterator si;
for (si = s2.begin(); si != s2.end(); ++si) {
out << " " << *(*si)->_parent_obj << "(" << (*si)->_wake_time - now
<< "s)";
@ -497,8 +510,21 @@ choose_next_context() {
do_timeslice_accounting(_current_thread, now);
_current_thread = NULL;
if (!_sleeping.empty()) {
wake_sleepers(now);
if (!_sleeping.empty() || !_volunteers.empty()) {
if (_ready.empty() && _next_ready.empty()) {
// All of our threads are currently sleeping. Therefore, wake
// the volunteer(s) immediately.
if (!_volunteers.empty()) {
wake_all_sleepers(_volunteers);
} else {
// No volunteers. Sleep the whole process.
system_yield();
now = get_current_time();
}
}
wake_sleepers(_sleeping, now);
wake_sleepers(_volunteers, now);
}
bool new_epoch = !_ready.empty() && _next_ready.empty();
@ -509,9 +535,7 @@ choose_next_context() {
while (_ready.empty()) {
if (!_next_ready.empty()) {
// We've finished an epoch.
bool all_volunteers = (_num_next_ready_volunteers == (int)_next_ready.size());
_ready.swap(_next_ready);
_num_next_ready_volunteers = 0;
if (new_epoch && !_tick_records.empty()) {
// Pop the oldest timeslice record off when we finish an
@ -526,18 +550,19 @@ choose_next_context() {
nassertv(record._thread->_run_ticks >= record._tick_count);
record._thread->_run_ticks -= record._tick_count;
_tick_records.pop_front();
} else {
// Otherwise, we're legitimately at the end of an epoch.
if (all_volunteers) {
// All of our non-blocked, non-sleeping threads have
// voluntarily yielded. Therefore, yield the whole
// process.
system_yield();
}
}
new_epoch = true;
} else if (!_volunteers.empty()) {
// There are some volunteers. Wake them. Also wake any
// sleepers that need it.
if (thread_cat->is_debug()) {
thread_cat.debug()
<< "Waking volunteers.\n";
}
wake_all_sleepers(_volunteers);
wake_sleepers(_sleeping, now);
} else if (!_sleeping.empty()) {
// All threads are sleeping.
double wait = _sleeping.front()->_wake_time - now;
@ -549,7 +574,8 @@ choose_next_context() {
system_sleep(wait);
}
now = get_current_time();
wake_sleepers(now);
wake_sleepers(_sleeping, now);
wake_sleepers(_volunteers, now);
} else {
// No threads are ready!
@ -609,7 +635,8 @@ choose_next_context() {
thread_cat.debug()
<< "Switching to " << *_current_thread->_parent_obj
<< " for " << timeslice << " s ("
<< _ready.size() + _next_ready.size()
<< _ready.size() << " + " << _next_ready.size()
<< " + " << _volunteers.size()
<< " other threads ready, " << blocked_count
<< " blocked, " << _sleeping.size() << " sleeping)\n";
}
@ -667,11 +694,27 @@ do_timeslice_accounting(ThreadSimpleImpl *thread, double now) {
// queue to the ready queue.
////////////////////////////////////////////////////////////////////
void ThreadSimpleManager::
wake_sleepers(double now) {
while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) {
ThreadSimpleImpl *thread = _sleeping.front();
pop_heap(_sleeping.begin(), _sleeping.end(), CompareStartTime());
_sleeping.pop_back();
wake_sleepers(ThreadSimpleManager::Sleeping &sleepers, double now) {
while (!sleepers.empty() && sleepers.front()->_wake_time <= now) {
ThreadSimpleImpl *thread = sleepers.front();
pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
sleepers.pop_back();
_ready.push_back(thread);
}
}
////////////////////////////////////////////////////////////////////
// Function: ThreadSimpleManager::wake_all_sleepers
// Access: Private
// Description: Moves all threads from the indicated sleeping queue
// to the ready queue, regardless of wake time.
////////////////////////////////////////////////////////////////////
void ThreadSimpleManager::
wake_all_sleepers(ThreadSimpleManager::Sleeping &sleepers) {
while (!sleepers.empty()) {
ThreadSimpleImpl *thread = sleepers.front();
pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime());
sleepers.pop_back();
_ready.push_back(thread);
}
}

View File

@ -84,12 +84,18 @@ public:
private:
static void init_pointers();
typedef pdeque<ThreadSimpleImpl *> FifoThreads;
typedef pvector<ThreadSimpleImpl *> Sleeping;
static void st_choose_next_context(void *data);
void choose_next_context();
void do_timeslice_accounting(ThreadSimpleImpl *thread, double now);
void wake_sleepers(double now);
void wake_sleepers(Sleeping &sleepers, double now);
void wake_all_sleepers(Sleeping &sleepers);
void report_deadlock();
double determine_timeslice(ThreadSimpleImpl *chosen_thread);
void kill_non_joinable(FifoThreads &threads);
void kill_non_joinable(Sleeping &threads);
// STL function object to sort the priority queue of sleeping threads.
class CompareStartTime {
@ -97,12 +103,6 @@ private:
INLINE bool operator ()(ThreadSimpleImpl *a, ThreadSimpleImpl *b) const;
};
typedef pdeque<ThreadSimpleImpl *> FifoThreads;
typedef pvector<ThreadSimpleImpl *> Sleeping;
void kill_non_joinable(FifoThreads &threads);
void kill_non_joinable(Sleeping &threads);
public:
// Defined within the class to avoid static-init ordering problems.
ConfigVariableDouble _simple_thread_epoch_timeslice;
@ -115,17 +115,31 @@ public:
private:
ThreadSimpleImpl *volatile _current_thread;
// FIFO list of ready threads.
// The list of ready threads: threads that are ready to execute
// right now.
FifoThreads _ready;
FifoThreads _next_ready;
int _num_next_ready_volunteers;
// The list of threads that are ready, but will not be executed
// until next epoch (for instance, because they exceeded their
// timeslice budget this epoch).
FifoThreads _next_ready;
// The list of threads that are blocked on some ConditionVar or
// Mutex.
typedef pmap<BlockerSimple *, FifoThreads> Blocked;
Blocked _blocked;
// Priority queue (partially-ordered heap) based on wakeup time.
// Priority queue (partially-ordered heap) of sleeping threads,
// based on wakeup time.
Sleeping _sleeping;
// Priority queue (partially-ordered heap) of volunteer threads,
// based on wakeup time. This are threads that have voluntarily
// yielded a timeslice. They are treated the same as sleeping
// threads, unless all threads are sleeping.
Sleeping _volunteers;
// Threads which have finished execution and are awaiting cleanup.
FifoThreads _finished;
ThreadSimpleImpl *_waiting_for_exit;