From 617f8c31df3177f2405561bef4ceaa30fae9922b Mon Sep 17 00:00:00 2001 From: David Rose Date: Thu, 10 Jul 2008 21:13:57 +0000 Subject: [PATCH] multiple vertex paging threads --- panda/src/display/graphicsEngine.cxx | 4 +- panda/src/gobj/config_gobj.cxx | 14 +-- panda/src/gobj/config_gobj.h | 2 +- panda/src/gobj/vertexDataPage.I | 36 ++---- panda/src/gobj/vertexDataPage.cxx | 178 ++++++++++++++++++--------- panda/src/gobj/vertexDataPage.h | 45 +++++-- 6 files changed, 174 insertions(+), 105 deletions(-) diff --git a/panda/src/display/graphicsEngine.cxx b/panda/src/display/graphicsEngine.cxx index eecbd3c477..ac282f5dad 100644 --- a/panda/src/display/graphicsEngine.cxx +++ b/panda/src/display/graphicsEngine.cxx @@ -509,8 +509,8 @@ remove_all_windows() { BamCache *cache = BamCache::get_global_ptr(); cache->flush_index(); - // And, hey, let's stop the vertex paging thread, if any. - VertexDataPage::stop_thread(); + // And, hey, let's stop the vertex paging threads, if any. + VertexDataPage::stop_threads(); // Well, and why not clean up all threads here? Thread::prepare_for_exit(); diff --git a/panda/src/gobj/config_gobj.cxx b/panda/src/gobj/config_gobj.cxx index 3f8d9afc17..22deff6b44 100644 --- a/panda/src/gobj/config_gobj.cxx +++ b/panda/src/gobj/config_gobj.cxx @@ -359,13 +359,13 @@ ConfigVariableInt vertex_data_small_size "is deemed too small to pay the overhead of paging it in and out, " "and it is permanently retained resident.")); -ConfigVariableBool vertex_data_threaded_paging -("vertex-data-threaded-paging", true, - PRC_DESC("When this is true (and Panda has been compiled with thread " - "support) then a sub-thread will be spawned to evict vertex pages " - "to disk and read them back again. When this is false, this " - "work will be done in the main thread, which may introduce " - "occasional random chugs in rendering.")); +ConfigVariableInt vertex_data_page_threads +("vertex-data-page-threads", 1, + PRC_DESC("When this is nonzero (and Panda has been compiled with thread " + "support) then this number of sub-threads will be spawned to " + "evict vertex pages to disk and read them back again. When this " + "is 0, this work will be done in the main thread, which may " + "introduce occasional random chugs in rendering.")); ConfigureFn(config_gobj) { BufferContext::init_type(); diff --git a/panda/src/gobj/config_gobj.h b/panda/src/gobj/config_gobj.h index f05ce3d6a0..5f06623e1c 100644 --- a/panda/src/gobj/config_gobj.h +++ b/panda/src/gobj/config_gobj.h @@ -88,7 +88,7 @@ extern EXPCL_PANDA_GOBJ ConfigVariableDouble default_keystone; extern EXPCL_PANDA_GOBJ ConfigVariableFilename vertex_save_file_directory; extern EXPCL_PANDA_GOBJ ConfigVariableString vertex_save_file_prefix; extern EXPCL_PANDA_GOBJ ConfigVariableInt vertex_data_small_size; -extern EXPCL_PANDA_GOBJ ConfigVariableBool vertex_data_threaded_paging; +extern EXPCL_PANDA_GOBJ ConfigVariableInt vertex_data_page_threads; #endif diff --git a/panda/src/gobj/vertexDataPage.I b/panda/src/gobj/vertexDataPage.I index 41ca0a64a8..7804d8d2e9 100644 --- a/panda/src/gobj/vertexDataPage.I +++ b/panda/src/gobj/vertexDataPage.I @@ -147,18 +147,22 @@ save_to_disk() { } //////////////////////////////////////////////////////////////////// -// Function: VertexDataPage::has_thread +// Function: VertexDataPage::get_num_threads // Access: Published, Static -// Description: Returns true if a thread has been spawned to service -// vertex paging requests, or false if it has not (which -// may mean either that all paging requests will be -// handled by the main thread, or simply that no paging -// requests have yet been issued). +// Description: Returns the number of threads that have been spawned +// to service vertex paging requests, or 0 if no threads +// have been spawned (which may mean either that all +// paging requests will be handled by the main thread, +// or simply that no paging requests have yet been +// issued). //////////////////////////////////////////////////////////////////// -INLINE bool VertexDataPage:: -has_thread() { +INLINE int VertexDataPage:: +get_num_threads() { MutexHolder holder(_tlock); - return !_thread.is_null(); + if (_thread_mgr == (PageThreadManager *)NULL) { + return 0; + } + return _thread_mgr->get_num_threads(); } //////////////////////////////////////////////////////////////////// @@ -237,17 +241,3 @@ INLINE size_t VertexDataPage:: round_up(size_t page_size) const { return ((page_size + _block_size - 1) / _block_size) * _block_size; } - -//////////////////////////////////////////////////////////////////// -// Function: VertexDataPage::PageThread::Constructor -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -INLINE VertexDataPage::PageThread:: -PageThread() : - Thread("VertexDataPage", "VertexDataPage"), - _shutdown(false), - _working_cvar(_tlock), - _pending_cvar(_tlock) -{ -} diff --git a/panda/src/gobj/vertexDataPage.cxx b/panda/src/gobj/vertexDataPage.cxx index 60c5b33683..f01386098a 100644 --- a/panda/src/gobj/vertexDataPage.cxx +++ b/panda/src/gobj/vertexDataPage.cxx @@ -52,7 +52,7 @@ ConfigVariableInt max_disk_vertex_data "that is allowed to be written to disk. Set it to -1 for no " "limit.")); -PT(VertexDataPage::PageThread) VertexDataPage::_thread; +PT(VertexDataPage::PageThreadManager) VertexDataPage::_thread_mgr; Mutex VertexDataPage::_tlock; SimpleLru VertexDataPage::_resident_lru("resident", max_resident_vertex_data); @@ -140,8 +140,8 @@ VertexDataPage:: { MutexHolder holder2(_tlock); if (_pending_ram_class != _ram_class) { - nassertv(_thread != (PageThread *)NULL); - _thread->remove_page(this); + nassertv(_thread_mgr != (PageThreadManager *)NULL); + _thread_mgr->remove_page(this); } } @@ -152,25 +152,25 @@ VertexDataPage:: } //////////////////////////////////////////////////////////////////// -// Function: VertexDataPage::stop_thread +// Function: VertexDataPage::stop_threads // Access: Published, Static // Description: Call this to stop the paging thread, if it was // started. This may block until all of the thread's // pending tasks have been completed. //////////////////////////////////////////////////////////////////// void VertexDataPage:: -stop_thread() { - PT(PageThread) thread; +stop_threads() { + PT(PageThreadManager) thread_mgr; { MutexHolder holder(_tlock); - thread = _thread; - _thread.clear(); + thread_mgr = _thread_mgr; + _thread_mgr.clear(); } - if (thread != (PageThread *)NULL) { + if (thread_mgr != (PageThreadManager *)NULL) { gobj_cat.info() - << "Stopping vertex paging thread.\n"; - thread->stop_thread(); + << "Stopping vertex paging threads.\n"; + thread_mgr->stop_threads(); } } @@ -284,8 +284,8 @@ void VertexDataPage:: make_resident_now() { MutexHolder holder(_tlock); if (_pending_ram_class != _ram_class) { - nassertv(_thread != (PageThread *)NULL); - _thread->remove_page(this); + nassertv(_thread_mgr != (PageThreadManager *)NULL); + _thread_mgr->remove_page(this); } make_resident(); @@ -559,7 +559,8 @@ adjust_book_size() { //////////////////////////////////////////////////////////////////// void VertexDataPage:: request_ram_class(RamClass ram_class) { - if (!vertex_data_threaded_paging || !Thread::is_threading_supported()) { + int num_threads = vertex_data_page_threads; + if (num_threads == 0 || !Thread::is_threading_supported()) { // No threads. Do it immediately. switch (ram_class) { case RC_resident: @@ -582,15 +583,14 @@ request_ram_class(RamClass ram_class) { } MutexHolder holder(_tlock); - if (_thread == (PageThread *)NULL) { - // Allocate and start a new global thread. + if (_thread_mgr == (PageThreadManager *)NULL) { + // Create the thread manager. gobj_cat.info() - << "Spawning vertex paging thread.\n"; - _thread = new PageThread; - _thread->start(TP_low, true); + << "Spawning " << num_threads << " vertex paging threads.\n"; + _thread_mgr = new PageThreadManager(num_threads); } - _thread->add_page(this, ram_class); + _thread_mgr->add_page(this, ram_class); } //////////////////////////////////////////////////////////////////// @@ -630,17 +630,37 @@ free_page_data(unsigned char *page_data, size_t page_size) const { _alloc_pages_pcollector.sub_level_now(page_size); memory_hook->mmap_free(page_data, page_size); } + +//////////////////////////////////////////////////////////////////// +// Function: VertexDataPage::PageThreadManager::Constructor +// Access: Public +// Description: Assumes _tlock is held. +//////////////////////////////////////////////////////////////////// +VertexDataPage::PageThreadManager:: +PageThreadManager(int num_threads) : + _shutdown(false), + _pending_cvar(_tlock) +{ + _threads.reserve(num_threads); + for (int i = 0; i < num_threads; ++i) { + ostringstream name_strm; + name_strm << "VertexDataPage" << i; + PT(PageThread) thread = new PageThread(this, name_strm.str()); + thread->start(TP_low, true); + _threads.push_back(thread); + } +} //////////////////////////////////////////////////////////////////// -// Function: VertexDataPage::PageThread::add_page +// Function: VertexDataPage::PageThreadManager::add_page // Access: Public -// Description: Enqueues the indicated page on the thread to convert -// it to the specified ram class. +// Description: Enqueues the indicated page on the thread queue to +// convert it to the specified ram class. // // It is assumed the page's lock is already held, and -// the thread's tlock is already held. +// that _tlock is already held. //////////////////////////////////////////////////////////////////// -void VertexDataPage::PageThread:: +void VertexDataPage::PageThreadManager:: add_page(VertexDataPage *page, RamClass ram_class) { if (page->_pending_ram_class == ram_class) { // It's already queued. @@ -671,26 +691,31 @@ add_page(VertexDataPage *page, RamClass ram_class) { } //////////////////////////////////////////////////////////////////// -// Function: VertexDataPage::PageThread::remove_page +// Function: VertexDataPage::PageThreadManager::remove_page // Access: Public // Description: Dequeues the indicated page and removes it from the // pending task list. // // It is assumed the page's lock is already held, and -// the thread's tlock is already held. +// that _tlock is already held. //////////////////////////////////////////////////////////////////// -void VertexDataPage::PageThread:: +void VertexDataPage::PageThreadManager:: remove_page(VertexDataPage *page) { nassertv(page != (VertexDataPage *)NULL); - if (page == _working_page) { - // Oops, the thread is currently working on this one. We'll have - // to wait for the thread to finish. - page->_lock.release(); - while (page == _working_page) { - _working_cvar.wait(); + + PageThreads::iterator ti; + for (ti = _threads.begin(); ti != _threads.begin(); ++ti) { + PageThread *thread = (*ti); + if (page == thread->_working_page) { + // Oops, this thread is currently working on this one. We'll have + // to wait for the thread to finish. + page->_lock.release(); + while (page == thread->_working_page) { + thread->_working_cvar.wait(); + } + page->_lock.lock(); + return; } - page->_lock.lock(); - return; } if (page->_pending_ram_class == RC_resident) { @@ -711,10 +736,56 @@ remove_page(VertexDataPage *page) { page->mark_used_lru(_global_lru[page->_ram_class]); } +//////////////////////////////////////////////////////////////////// +// Function: VertexDataPage::PageThreadManager::get_num_threads +// Access: Public +// Description: Returns the number of threads active on the thread +// manager. Assumes _tlock is held. +//////////////////////////////////////////////////////////////////// +int VertexDataPage::PageThreadManager:: +get_num_threads() const { + return (int)_threads.size(); +} + +//////////////////////////////////////////////////////////////////// +// Function: VertexDataPage::PageThreadManager::stop_threads +// Access: Public +// Description: Signals all the threads to stop and waits for them. +// Does not return until the threads have finished. +// Assumes _tlock is *not* held. +//////////////////////////////////////////////////////////////////// +void VertexDataPage::PageThreadManager:: +stop_threads() { + { + MutexHolder holder(_tlock); + _shutdown = true; + _pending_cvar.signal_all(); + } + + PageThreads::iterator ti; + for (ti = _threads.begin(); ti != _threads.begin(); ++ti) { + PageThread *thread = (*ti); + thread->join(); + } +} + +//////////////////////////////////////////////////////////////////// +// Function: VertexDataPage::PageThread::Constructor +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +VertexDataPage::PageThread:: +PageThread(PageThreadManager *manager, const string &name) : + Thread(name, name), + _manager(manager), + _working_cvar(_tlock) +{ +} + //////////////////////////////////////////////////////////////////// // Function: VertexDataPage::PageThread::thread_main // Access: Protected, Virtual -// Description: The main processing loop for the sub-thread. +// Description: The main processing loop for each sub-thread. //////////////////////////////////////////////////////////////////// void VertexDataPage::PageThread:: thread_main() { @@ -723,22 +794,23 @@ thread_main() { while (true) { PStatClient::thread_tick(get_sync_name()); - while (_pending_reads.empty() && _pending_writes.empty()) { - if (_shutdown) { + while (_manager->_pending_reads.empty() && + _manager->_pending_writes.empty()) { + if (_manager->_shutdown) { _tlock.release(); return; } PStatTimer timer(_thread_wait_pcollector); - _pending_cvar.wait(); + _manager->_pending_cvar.wait(); } // Reads always have priority. - if (!_pending_reads.empty()) { - _working_page = _pending_reads.front(); - _pending_reads.pop_front(); + if (!_manager->_pending_reads.empty()) { + _working_page = _manager->_pending_reads.front(); + _manager->_pending_reads.pop_front(); } else { - _working_page = _pending_writes.front(); - _pending_writes.pop_front(); + _working_page = _manager->_pending_writes.front(); + _manager->_pending_writes.pop_front(); } RamClass ram_class = _working_page->_pending_ram_class; @@ -772,19 +844,3 @@ thread_main() { Thread::consider_yield(); } } - -//////////////////////////////////////////////////////////////////// -// Function: VertexDataPage::PageThread::stop_thread -// Access: Public -// Description: Signals the thread to stop and waits for it. Does -// not return until the thread has finished. -//////////////////////////////////////////////////////////////////// -void VertexDataPage::PageThread:: -stop_thread() { - { - MutexHolder holder(_tlock); - _shutdown = true; - _pending_cvar.signal(); - } - join(); -} diff --git a/panda/src/gobj/vertexDataPage.h b/panda/src/gobj/vertexDataPage.h index 6df4ae40a6..ee23458797 100644 --- a/panda/src/gobj/vertexDataPage.h +++ b/panda/src/gobj/vertexDataPage.h @@ -22,6 +22,7 @@ #include "vertexDataSaveFile.h" #include "pmutex.h" #include "conditionVar.h" +#include "conditionVarFull.h" #include "thread.h" #include "mutexHolder.h" #include "pdeque.h" @@ -69,8 +70,8 @@ PUBLISHED: INLINE bool save_to_disk(); - INLINE static bool has_thread(); - static void stop_thread(); + INLINE static int get_num_threads(); + static void stop_threads(); public: INLINE unsigned char *get_page_data(bool force); @@ -106,27 +107,49 @@ private: typedef pdeque PendingPages; + class PageThreadManager; class PageThread : public Thread { public: - INLINE PageThread(); - - void add_page(VertexDataPage *page, RamClass ram_class); - void remove_page(VertexDataPage *page); - void stop_thread(); + PageThread(PageThreadManager *manager, const string &name); protected: virtual void thread_main(); private: + PageThreadManager *_manager; VertexDataPage *_working_page; + + // Signaled when _working_page is set to NULL after finishing a + // task. + ConditionVar _working_cvar; + friend class PageThreadManager; + }; + typedef pvector PageThreads; + + class PageThreadManager : public ReferenceCount { + public: + PageThreadManager(int num_threads); + void add_page(VertexDataPage *page, RamClass ram_class); + void remove_page(VertexDataPage *page); + int get_num_threads() const; + void stop_threads(); + + private: PendingPages _pending_writes; PendingPages _pending_reads; bool _shutdown; - ConditionVar _working_cvar; - ConditionVar _pending_cvar; + + // Signaled when anything new is added to either of the above + // queues, or when _shutdown is set true. This wakes up any + // pending thread. + ConditionVarFull _pending_cvar; + + PageThreads _threads; + friend class PageThread; }; - static PT(PageThread) _thread; - static Mutex _tlock; // Protects the thread members. + + static PT(PageThreadManager) _thread_mgr; + static Mutex _tlock; // Protects _thread_mgr and all of its members. unsigned char *_page_data; size_t _size, _allocated_size, _uncompressed_size;