From 7db0cd974c17e1b90e758e639533b92a7f1ed4cb Mon Sep 17 00:00:00 2001 From: David Rose Date: Fri, 23 Jan 2009 02:39:36 +0000 Subject: [PATCH] better handling of thread::force_yield(), not to impact other threads --- panda/src/pipeline/threadSimpleManager.cxx | 91 ++++++++++++++++------ panda/src/pipeline/threadSimpleManager.h | 36 ++++++--- 2 files changed, 92 insertions(+), 35 deletions(-) diff --git a/panda/src/pipeline/threadSimpleManager.cxx b/panda/src/pipeline/threadSimpleManager.cxx index 31a0c43bfa..b3b0a1da57 100644 --- a/panda/src/pipeline/threadSimpleManager.cxx +++ b/panda/src/pipeline/threadSimpleManager.cxx @@ -98,9 +98,17 @@ void ThreadSimpleManager:: enqueue_ready(ThreadSimpleImpl *thread, bool volunteer) { // We actually add it to _next_ready, so that we can tell when we // have processed every thread in a given epoch. - _next_ready.push_back(thread); - if (volunteer) { - ++_num_next_ready_volunteers; + if (!volunteer) { + _next_ready.push_back(thread); + + } else { + // Unless it's a volunteer, in which case we actually put it to + // sleep for the duration of the timeslice, so it won't interfere + // with timeslice accounting for the remaining ready threads. + double now = get_current_time(); + thread->_wake_time = now + _simple_thread_epoch_timeslice; + _volunteers.push_back(thread); + push_heap(_volunteers.begin(), _volunteers.end(), CompareStartTime()); } } @@ -314,6 +322,7 @@ prepare_for_exit() { } kill_non_joinable(_sleeping); + kill_non_joinable(_volunteers); next_context(); @@ -393,20 +402,24 @@ write_status(ostream &out) const { out << "Ready:"; FifoThreads::const_iterator ti; + Sleeping::const_iterator si; for (ti = _ready.begin(); ti != _ready.end(); ++ti) { out << " " << *(*ti)->_parent_obj; } for (ti = _next_ready.begin(); ti != _next_ready.end(); ++ti) { out << " " << *(*ti)->_parent_obj; } + for (si = _volunteers.begin(); si != _volunteers.end(); ++si) { + out << " " << *(*si)->_parent_obj; + } out << "\n"; double now = get_current_time(); + out << "Sleeping:"; // Copy and sort for convenience. Sleeping s2 = _sleeping; sort(s2.begin(), s2.end(), CompareStartTime()); - Sleeping::const_iterator si; for (si = s2.begin(); si != s2.end(); ++si) { out << " " << *(*si)->_parent_obj << "(" << (*si)->_wake_time - now << "s)"; @@ -497,8 +510,21 @@ choose_next_context() { do_timeslice_accounting(_current_thread, now); _current_thread = NULL; - if (!_sleeping.empty()) { - wake_sleepers(now); + if (!_sleeping.empty() || !_volunteers.empty()) { + if (_ready.empty() && _next_ready.empty()) { + // All of our threads are currently sleeping. Therefore, wake + // the volunteer(s) immediately. + if (!_volunteers.empty()) { + wake_all_sleepers(_volunteers); + + } else { + // No volunteers. Sleep the whole process. + system_yield(); + now = get_current_time(); + } + } + wake_sleepers(_sleeping, now); + wake_sleepers(_volunteers, now); } bool new_epoch = !_ready.empty() && _next_ready.empty(); @@ -509,9 +535,7 @@ choose_next_context() { while (_ready.empty()) { if (!_next_ready.empty()) { // We've finished an epoch. - bool all_volunteers = (_num_next_ready_volunteers == (int)_next_ready.size()); _ready.swap(_next_ready); - _num_next_ready_volunteers = 0; if (new_epoch && !_tick_records.empty()) { // Pop the oldest timeslice record off when we finish an @@ -526,18 +550,19 @@ choose_next_context() { nassertv(record._thread->_run_ticks >= record._tick_count); record._thread->_run_ticks -= record._tick_count; _tick_records.pop_front(); - - } else { - // Otherwise, we're legitimately at the end of an epoch. - if (all_volunteers) { - // All of our non-blocked, non-sleeping threads have - // voluntarily yielded. Therefore, yield the whole - // process. - system_yield(); - } } new_epoch = true; + } else if (!_volunteers.empty()) { + // There are some volunteers. Wake them. Also wake any + // sleepers that need it. + if (thread_cat->is_debug()) { + thread_cat.debug() + << "Waking volunteers.\n"; + } + wake_all_sleepers(_volunteers); + wake_sleepers(_sleeping, now); + } else if (!_sleeping.empty()) { // All threads are sleeping. double wait = _sleeping.front()->_wake_time - now; @@ -549,7 +574,8 @@ choose_next_context() { system_sleep(wait); } now = get_current_time(); - wake_sleepers(now); + wake_sleepers(_sleeping, now); + wake_sleepers(_volunteers, now); } else { // No threads are ready! @@ -609,7 +635,8 @@ choose_next_context() { thread_cat.debug() << "Switching to " << *_current_thread->_parent_obj << " for " << timeslice << " s (" - << _ready.size() + _next_ready.size() + << _ready.size() << " + " << _next_ready.size() + << " + " << _volunteers.size() << " other threads ready, " << blocked_count << " blocked, " << _sleeping.size() << " sleeping)\n"; } @@ -667,11 +694,27 @@ do_timeslice_accounting(ThreadSimpleImpl *thread, double now) { // queue to the ready queue. //////////////////////////////////////////////////////////////////// void ThreadSimpleManager:: -wake_sleepers(double now) { - while (!_sleeping.empty() && _sleeping.front()->_wake_time <= now) { - ThreadSimpleImpl *thread = _sleeping.front(); - pop_heap(_sleeping.begin(), _sleeping.end(), CompareStartTime()); - _sleeping.pop_back(); +wake_sleepers(ThreadSimpleManager::Sleeping &sleepers, double now) { + while (!sleepers.empty() && sleepers.front()->_wake_time <= now) { + ThreadSimpleImpl *thread = sleepers.front(); + pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime()); + sleepers.pop_back(); + _ready.push_back(thread); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: ThreadSimpleManager::wake_all_sleepers +// Access: Private +// Description: Moves all threads from the indicated sleeping queue +// to the ready queue, regardless of wake time. +//////////////////////////////////////////////////////////////////// +void ThreadSimpleManager:: +wake_all_sleepers(ThreadSimpleManager::Sleeping &sleepers) { + while (!sleepers.empty()) { + ThreadSimpleImpl *thread = sleepers.front(); + pop_heap(sleepers.begin(), sleepers.end(), CompareStartTime()); + sleepers.pop_back(); _ready.push_back(thread); } } diff --git a/panda/src/pipeline/threadSimpleManager.h b/panda/src/pipeline/threadSimpleManager.h index ee7f7f5e54..873bb4ad08 100644 --- a/panda/src/pipeline/threadSimpleManager.h +++ b/panda/src/pipeline/threadSimpleManager.h @@ -84,12 +84,18 @@ public: private: static void init_pointers(); + typedef pdeque FifoThreads; + typedef pvector Sleeping; + static void st_choose_next_context(void *data); void choose_next_context(); void do_timeslice_accounting(ThreadSimpleImpl *thread, double now); - void wake_sleepers(double now); + void wake_sleepers(Sleeping &sleepers, double now); + void wake_all_sleepers(Sleeping &sleepers); void report_deadlock(); double determine_timeslice(ThreadSimpleImpl *chosen_thread); + void kill_non_joinable(FifoThreads &threads); + void kill_non_joinable(Sleeping &threads); // STL function object to sort the priority queue of sleeping threads. class CompareStartTime { @@ -97,12 +103,6 @@ private: INLINE bool operator ()(ThreadSimpleImpl *a, ThreadSimpleImpl *b) const; }; - typedef pdeque FifoThreads; - typedef pvector Sleeping; - - void kill_non_joinable(FifoThreads &threads); - void kill_non_joinable(Sleeping &threads); - public: // Defined within the class to avoid static-init ordering problems. ConfigVariableDouble _simple_thread_epoch_timeslice; @@ -115,17 +115,31 @@ public: private: ThreadSimpleImpl *volatile _current_thread; - // FIFO list of ready threads. + // The list of ready threads: threads that are ready to execute + // right now. FifoThreads _ready; - FifoThreads _next_ready; - int _num_next_ready_volunteers; + // The list of threads that are ready, but will not be executed + // until next epoch (for instance, because they exceeded their + // timeslice budget this epoch). + FifoThreads _next_ready; + + // The list of threads that are blocked on some ConditionVar or + // Mutex. typedef pmap Blocked; Blocked _blocked; - // Priority queue (partially-ordered heap) based on wakeup time. + // Priority queue (partially-ordered heap) of sleeping threads, + // based on wakeup time. Sleeping _sleeping; + // Priority queue (partially-ordered heap) of volunteer threads, + // based on wakeup time. This are threads that have voluntarily + // yielded a timeslice. They are treated the same as sleeping + // threads, unless all threads are sleeping. + Sleeping _volunteers; + + // Threads which have finished execution and are awaiting cleanup. FifoThreads _finished; ThreadSimpleImpl *_waiting_for_exit;