multiple vertex paging threads

This commit is contained in:
David Rose 2008-07-10 21:13:57 +00:00
parent 9fcac8b39c
commit 617f8c31df
6 changed files with 174 additions and 105 deletions

View File

@ -509,8 +509,8 @@ remove_all_windows() {
BamCache *cache = BamCache::get_global_ptr(); BamCache *cache = BamCache::get_global_ptr();
cache->flush_index(); cache->flush_index();
// And, hey, let's stop the vertex paging thread, if any. // And, hey, let's stop the vertex paging threads, if any.
VertexDataPage::stop_thread(); VertexDataPage::stop_threads();
// Well, and why not clean up all threads here? // Well, and why not clean up all threads here?
Thread::prepare_for_exit(); Thread::prepare_for_exit();

View File

@ -359,13 +359,13 @@ ConfigVariableInt vertex_data_small_size
"is deemed too small to pay the overhead of paging it in and out, " "is deemed too small to pay the overhead of paging it in and out, "
"and it is permanently retained resident.")); "and it is permanently retained resident."));
ConfigVariableBool vertex_data_threaded_paging ConfigVariableInt vertex_data_page_threads
("vertex-data-threaded-paging", true, ("vertex-data-page-threads", 1,
PRC_DESC("When this is true (and Panda has been compiled with thread " PRC_DESC("When this is nonzero (and Panda has been compiled with thread "
"support) then a sub-thread will be spawned to evict vertex pages " "support) then this number of sub-threads will be spawned to "
"to disk and read them back again. When this is false, this " "evict vertex pages to disk and read them back again. When this "
"work will be done in the main thread, which may introduce " "is 0, this work will be done in the main thread, which may "
"occasional random chugs in rendering.")); "introduce occasional random chugs in rendering."));
ConfigureFn(config_gobj) { ConfigureFn(config_gobj) {
BufferContext::init_type(); BufferContext::init_type();

View File

@ -88,7 +88,7 @@ extern EXPCL_PANDA_GOBJ ConfigVariableDouble default_keystone;
extern EXPCL_PANDA_GOBJ ConfigVariableFilename vertex_save_file_directory; extern EXPCL_PANDA_GOBJ ConfigVariableFilename vertex_save_file_directory;
extern EXPCL_PANDA_GOBJ ConfigVariableString vertex_save_file_prefix; extern EXPCL_PANDA_GOBJ ConfigVariableString vertex_save_file_prefix;
extern EXPCL_PANDA_GOBJ ConfigVariableInt vertex_data_small_size; extern EXPCL_PANDA_GOBJ ConfigVariableInt vertex_data_small_size;
extern EXPCL_PANDA_GOBJ ConfigVariableBool vertex_data_threaded_paging; extern EXPCL_PANDA_GOBJ ConfigVariableInt vertex_data_page_threads;
#endif #endif

View File

@ -147,18 +147,22 @@ save_to_disk() {
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::has_thread // Function: VertexDataPage::get_num_threads
// Access: Published, Static // Access: Published, Static
// Description: Returns true if a thread has been spawned to service // Description: Returns the number of threads that have been spawned
// vertex paging requests, or false if it has not (which // to service vertex paging requests, or 0 if no threads
// may mean either that all paging requests will be // have been spawned (which may mean either that all
// handled by the main thread, or simply that no paging // paging requests will be handled by the main thread,
// requests have yet been issued). // or simply that no paging requests have yet been
// issued).
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
INLINE bool VertexDataPage:: INLINE int VertexDataPage::
has_thread() { get_num_threads() {
MutexHolder holder(_tlock); MutexHolder holder(_tlock);
return !_thread.is_null(); if (_thread_mgr == (PageThreadManager *)NULL) {
return 0;
}
return _thread_mgr->get_num_threads();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -237,17 +241,3 @@ INLINE size_t VertexDataPage::
round_up(size_t page_size) const { round_up(size_t page_size) const {
return ((page_size + _block_size - 1) / _block_size) * _block_size; return ((page_size + _block_size - 1) / _block_size) * _block_size;
} }
////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThread::Constructor
// Access: Public
// Description:
////////////////////////////////////////////////////////////////////
INLINE VertexDataPage::PageThread::
PageThread() :
Thread("VertexDataPage", "VertexDataPage"),
_shutdown(false),
_working_cvar(_tlock),
_pending_cvar(_tlock)
{
}

View File

@ -52,7 +52,7 @@ ConfigVariableInt max_disk_vertex_data
"that is allowed to be written to disk. Set it to -1 for no " "that is allowed to be written to disk. Set it to -1 for no "
"limit.")); "limit."));
PT(VertexDataPage::PageThread) VertexDataPage::_thread; PT(VertexDataPage::PageThreadManager) VertexDataPage::_thread_mgr;
Mutex VertexDataPage::_tlock; Mutex VertexDataPage::_tlock;
SimpleLru VertexDataPage::_resident_lru("resident", max_resident_vertex_data); SimpleLru VertexDataPage::_resident_lru("resident", max_resident_vertex_data);
@ -140,8 +140,8 @@ VertexDataPage::
{ {
MutexHolder holder2(_tlock); MutexHolder holder2(_tlock);
if (_pending_ram_class != _ram_class) { if (_pending_ram_class != _ram_class) {
nassertv(_thread != (PageThread *)NULL); nassertv(_thread_mgr != (PageThreadManager *)NULL);
_thread->remove_page(this); _thread_mgr->remove_page(this);
} }
} }
@ -152,25 +152,25 @@ VertexDataPage::
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::stop_thread // Function: VertexDataPage::stop_threads
// Access: Published, Static // Access: Published, Static
// Description: Call this to stop the paging thread, if it was // Description: Call this to stop the paging thread, if it was
// started. This may block until all of the thread's // started. This may block until all of the thread's
// pending tasks have been completed. // pending tasks have been completed.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void VertexDataPage:: void VertexDataPage::
stop_thread() { stop_threads() {
PT(PageThread) thread; PT(PageThreadManager) thread_mgr;
{ {
MutexHolder holder(_tlock); MutexHolder holder(_tlock);
thread = _thread; thread_mgr = _thread_mgr;
_thread.clear(); _thread_mgr.clear();
} }
if (thread != (PageThread *)NULL) { if (thread_mgr != (PageThreadManager *)NULL) {
gobj_cat.info() gobj_cat.info()
<< "Stopping vertex paging thread.\n"; << "Stopping vertex paging threads.\n";
thread->stop_thread(); thread_mgr->stop_threads();
} }
} }
@ -284,8 +284,8 @@ void VertexDataPage::
make_resident_now() { make_resident_now() {
MutexHolder holder(_tlock); MutexHolder holder(_tlock);
if (_pending_ram_class != _ram_class) { if (_pending_ram_class != _ram_class) {
nassertv(_thread != (PageThread *)NULL); nassertv(_thread_mgr != (PageThreadManager *)NULL);
_thread->remove_page(this); _thread_mgr->remove_page(this);
} }
make_resident(); make_resident();
@ -559,7 +559,8 @@ adjust_book_size() {
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void VertexDataPage:: void VertexDataPage::
request_ram_class(RamClass ram_class) { request_ram_class(RamClass ram_class) {
if (!vertex_data_threaded_paging || !Thread::is_threading_supported()) { int num_threads = vertex_data_page_threads;
if (num_threads == 0 || !Thread::is_threading_supported()) {
// No threads. Do it immediately. // No threads. Do it immediately.
switch (ram_class) { switch (ram_class) {
case RC_resident: case RC_resident:
@ -582,15 +583,14 @@ request_ram_class(RamClass ram_class) {
} }
MutexHolder holder(_tlock); MutexHolder holder(_tlock);
if (_thread == (PageThread *)NULL) { if (_thread_mgr == (PageThreadManager *)NULL) {
// Allocate and start a new global thread. // Create the thread manager.
gobj_cat.info() gobj_cat.info()
<< "Spawning vertex paging thread.\n"; << "Spawning " << num_threads << " vertex paging threads.\n";
_thread = new PageThread; _thread_mgr = new PageThreadManager(num_threads);
_thread->start(TP_low, true);
} }
_thread->add_page(this, ram_class); _thread_mgr->add_page(this, ram_class);
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -630,17 +630,37 @@ free_page_data(unsigned char *page_data, size_t page_size) const {
_alloc_pages_pcollector.sub_level_now(page_size); _alloc_pages_pcollector.sub_level_now(page_size);
memory_hook->mmap_free(page_data, page_size); memory_hook->mmap_free(page_data, page_size);
} }
////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThreadManager::Constructor
// Access: Public
// Description: Assumes _tlock is held.
////////////////////////////////////////////////////////////////////
VertexDataPage::PageThreadManager::
PageThreadManager(int num_threads) :
_shutdown(false),
_pending_cvar(_tlock)
{
_threads.reserve(num_threads);
for (int i = 0; i < num_threads; ++i) {
ostringstream name_strm;
name_strm << "VertexDataPage" << i;
PT(PageThread) thread = new PageThread(this, name_strm.str());
thread->start(TP_low, true);
_threads.push_back(thread);
}
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThread::add_page // Function: VertexDataPage::PageThreadManager::add_page
// Access: Public // Access: Public
// Description: Enqueues the indicated page on the thread to convert // Description: Enqueues the indicated page on the thread queue to
// it to the specified ram class. // convert it to the specified ram class.
// //
// It is assumed the page's lock is already held, and // It is assumed the page's lock is already held, and
// the thread's tlock is already held. // that _tlock is already held.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void VertexDataPage::PageThread:: void VertexDataPage::PageThreadManager::
add_page(VertexDataPage *page, RamClass ram_class) { add_page(VertexDataPage *page, RamClass ram_class) {
if (page->_pending_ram_class == ram_class) { if (page->_pending_ram_class == ram_class) {
// It's already queued. // It's already queued.
@ -671,26 +691,31 @@ add_page(VertexDataPage *page, RamClass ram_class) {
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThread::remove_page // Function: VertexDataPage::PageThreadManager::remove_page
// Access: Public // Access: Public
// Description: Dequeues the indicated page and removes it from the // Description: Dequeues the indicated page and removes it from the
// pending task list. // pending task list.
// //
// It is assumed the page's lock is already held, and // It is assumed the page's lock is already held, and
// the thread's tlock is already held. // that _tlock is already held.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void VertexDataPage::PageThread:: void VertexDataPage::PageThreadManager::
remove_page(VertexDataPage *page) { remove_page(VertexDataPage *page) {
nassertv(page != (VertexDataPage *)NULL); nassertv(page != (VertexDataPage *)NULL);
if (page == _working_page) {
// Oops, the thread is currently working on this one. We'll have PageThreads::iterator ti;
// to wait for the thread to finish. for (ti = _threads.begin(); ti != _threads.begin(); ++ti) {
page->_lock.release(); PageThread *thread = (*ti);
while (page == _working_page) { if (page == thread->_working_page) {
_working_cvar.wait(); // Oops, this thread is currently working on this one. We'll have
// to wait for the thread to finish.
page->_lock.release();
while (page == thread->_working_page) {
thread->_working_cvar.wait();
}
page->_lock.lock();
return;
} }
page->_lock.lock();
return;
} }
if (page->_pending_ram_class == RC_resident) { if (page->_pending_ram_class == RC_resident) {
@ -711,10 +736,56 @@ remove_page(VertexDataPage *page) {
page->mark_used_lru(_global_lru[page->_ram_class]); page->mark_used_lru(_global_lru[page->_ram_class]);
} }
////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThreadManager::get_num_threads
// Access: Public
// Description: Returns the number of threads active on the thread
// manager. Assumes _tlock is held.
////////////////////////////////////////////////////////////////////
int VertexDataPage::PageThreadManager::
get_num_threads() const {
return (int)_threads.size();
}
////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThreadManager::stop_threads
// Access: Public
// Description: Signals all the threads to stop and waits for them.
// Does not return until the threads have finished.
// Assumes _tlock is *not* held.
////////////////////////////////////////////////////////////////////
void VertexDataPage::PageThreadManager::
stop_threads() {
{
MutexHolder holder(_tlock);
_shutdown = true;
_pending_cvar.signal_all();
}
PageThreads::iterator ti;
for (ti = _threads.begin(); ti != _threads.begin(); ++ti) {
PageThread *thread = (*ti);
thread->join();
}
}
////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThread::Constructor
// Access: Public
// Description:
////////////////////////////////////////////////////////////////////
VertexDataPage::PageThread::
PageThread(PageThreadManager *manager, const string &name) :
Thread(name, name),
_manager(manager),
_working_cvar(_tlock)
{
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThread::thread_main // Function: VertexDataPage::PageThread::thread_main
// Access: Protected, Virtual // Access: Protected, Virtual
// Description: The main processing loop for the sub-thread. // Description: The main processing loop for each sub-thread.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void VertexDataPage::PageThread:: void VertexDataPage::PageThread::
thread_main() { thread_main() {
@ -723,22 +794,23 @@ thread_main() {
while (true) { while (true) {
PStatClient::thread_tick(get_sync_name()); PStatClient::thread_tick(get_sync_name());
while (_pending_reads.empty() && _pending_writes.empty()) { while (_manager->_pending_reads.empty() &&
if (_shutdown) { _manager->_pending_writes.empty()) {
if (_manager->_shutdown) {
_tlock.release(); _tlock.release();
return; return;
} }
PStatTimer timer(_thread_wait_pcollector); PStatTimer timer(_thread_wait_pcollector);
_pending_cvar.wait(); _manager->_pending_cvar.wait();
} }
// Reads always have priority. // Reads always have priority.
if (!_pending_reads.empty()) { if (!_manager->_pending_reads.empty()) {
_working_page = _pending_reads.front(); _working_page = _manager->_pending_reads.front();
_pending_reads.pop_front(); _manager->_pending_reads.pop_front();
} else { } else {
_working_page = _pending_writes.front(); _working_page = _manager->_pending_writes.front();
_pending_writes.pop_front(); _manager->_pending_writes.pop_front();
} }
RamClass ram_class = _working_page->_pending_ram_class; RamClass ram_class = _working_page->_pending_ram_class;
@ -772,19 +844,3 @@ thread_main() {
Thread::consider_yield(); Thread::consider_yield();
} }
} }
////////////////////////////////////////////////////////////////////
// Function: VertexDataPage::PageThread::stop_thread
// Access: Public
// Description: Signals the thread to stop and waits for it. Does
// not return until the thread has finished.
////////////////////////////////////////////////////////////////////
void VertexDataPage::PageThread::
stop_thread() {
{
MutexHolder holder(_tlock);
_shutdown = true;
_pending_cvar.signal();
}
join();
}

View File

@ -22,6 +22,7 @@
#include "vertexDataSaveFile.h" #include "vertexDataSaveFile.h"
#include "pmutex.h" #include "pmutex.h"
#include "conditionVar.h" #include "conditionVar.h"
#include "conditionVarFull.h"
#include "thread.h" #include "thread.h"
#include "mutexHolder.h" #include "mutexHolder.h"
#include "pdeque.h" #include "pdeque.h"
@ -69,8 +70,8 @@ PUBLISHED:
INLINE bool save_to_disk(); INLINE bool save_to_disk();
INLINE static bool has_thread(); INLINE static int get_num_threads();
static void stop_thread(); static void stop_threads();
public: public:
INLINE unsigned char *get_page_data(bool force); INLINE unsigned char *get_page_data(bool force);
@ -106,27 +107,49 @@ private:
typedef pdeque<VertexDataPage *> PendingPages; typedef pdeque<VertexDataPage *> PendingPages;
class PageThreadManager;
class PageThread : public Thread { class PageThread : public Thread {
public: public:
INLINE PageThread(); PageThread(PageThreadManager *manager, const string &name);
void add_page(VertexDataPage *page, RamClass ram_class);
void remove_page(VertexDataPage *page);
void stop_thread();
protected: protected:
virtual void thread_main(); virtual void thread_main();
private: private:
PageThreadManager *_manager;
VertexDataPage *_working_page; VertexDataPage *_working_page;
// Signaled when _working_page is set to NULL after finishing a
// task.
ConditionVar _working_cvar;
friend class PageThreadManager;
};
typedef pvector<PT(PageThread) > PageThreads;
class PageThreadManager : public ReferenceCount {
public:
PageThreadManager(int num_threads);
void add_page(VertexDataPage *page, RamClass ram_class);
void remove_page(VertexDataPage *page);
int get_num_threads() const;
void stop_threads();
private:
PendingPages _pending_writes; PendingPages _pending_writes;
PendingPages _pending_reads; PendingPages _pending_reads;
bool _shutdown; bool _shutdown;
ConditionVar _working_cvar;
ConditionVar _pending_cvar; // Signaled when anything new is added to either of the above
// queues, or when _shutdown is set true. This wakes up any
// pending thread.
ConditionVarFull _pending_cvar;
PageThreads _threads;
friend class PageThread;
}; };
static PT(PageThread) _thread;
static Mutex _tlock; // Protects the thread members. static PT(PageThreadManager) _thread_mgr;
static Mutex _tlock; // Protects _thread_mgr and all of its members.
unsigned char *_page_data; unsigned char *_page_data;
size_t _size, _allocated_size, _uncompressed_size; size_t _size, _allocated_size, _uncompressed_size;