From bb530435e8afae09f36c0bcc606a7954a13ab943 Mon Sep 17 00:00:00 2001 From: rdb Date: Sat, 3 Dec 2022 22:20:23 +0100 Subject: [PATCH] pstats: Offload more work to separate PStats thread This improves performance of PStats client when a lot of data is being sent per frame. Also increase the value of pstats-max-queue-size, since otherwise we're constantly dropping frames when using multiple threads. --- .../glstuff/glGraphicsStateGuardian_src.cxx | 2 +- panda/src/pstatclient/config_pstatclient.cxx | 2 +- panda/src/pstatclient/pStatClientImpl.I | 9 ++ panda/src/pstatclient/pStatClientImpl.cxx | 120 +++++++++++++++++- panda/src/pstatclient/pStatClientImpl.h | 28 +++- panda/src/pstatclient/pStatThread.cxx | 4 +- panda/src/pstatclient/pStatThread.h | 2 +- 7 files changed, 158 insertions(+), 9 deletions(-) diff --git a/panda/src/glstuff/glGraphicsStateGuardian_src.cxx b/panda/src/glstuff/glGraphicsStateGuardian_src.cxx index 9a73b7805e..5cea01b782 100644 --- a/panda/src/glstuff/glGraphicsStateGuardian_src.cxx +++ b/panda/src/glstuff/glGraphicsStateGuardian_src.cxx @@ -4522,7 +4522,7 @@ end_frame_timing(const FrameTiming &frame) { // The end time of the last collector is implicitly the frame's end time. frame_data.add_stop(0, frame_data.get_end()); - gpu_thread.add_frame(frame._frame_number, frame_data); + gpu_thread.add_frame(frame._frame_number, std::move(frame_data)); _timer_queries_pcollector.add_level_now(frame._queries.size()); #endif diff --git a/panda/src/pstatclient/config_pstatclient.cxx b/panda/src/pstatclient/config_pstatclient.cxx index cfe2040e49..51c040eb43 100644 --- a/panda/src/pstatclient/config_pstatclient.cxx +++ b/panda/src/pstatclient/config_pstatclient.cxx @@ -43,7 +43,7 @@ ConfigVariableBool pstats_threaded_write "broken with the threaded network interfaces.")); ConfigVariableInt pstats_max_queue_size -("pstats-max-queue-size", 1, +("pstats-max-queue-size", 32, PRC_DESC("If pstats-threaded-write is true, this specifies the maximum " "number of packets (generally, frames of data) that may be queued " "up for the thread to process. If this is large, the writer " diff --git a/panda/src/pstatclient/pStatClientImpl.I b/panda/src/pstatclient/pStatClientImpl.I index 8e38b9a486..a5a4ca038f 100644 --- a/panda/src/pstatclient/pStatClientImpl.I +++ b/panda/src/pstatclient/pStatClientImpl.I @@ -90,3 +90,12 @@ client_resume_after_pause() { double delta = _clock->get_short_time() - _last_frame; _delta -= delta; } + +/** + * + */ +INLINE PStatClientImpl::QueuedFrame:: +QueuedFrame(int thread_index, int frame_number) : + _thread_index(thread_index), + _frame_number(frame_number) { +} diff --git a/panda/src/pstatclient/pStatClientImpl.cxx b/panda/src/pstatclient/pStatClientImpl.cxx index eaa55fe56b..f69848718f 100644 --- a/panda/src/pstatclient/pStatClientImpl.cxx +++ b/panda/src/pstatclient/pStatClientImpl.cxx @@ -26,6 +26,8 @@ #include "cmath.h" #include "conditionVarWin32Impl.h" #include "conditionVarPosixImpl.h" +#include "genericThread.h" +#include "mutexHolder.h" #include @@ -53,9 +55,17 @@ PStatClientImpl(PStatClient *client) : _last_frame(0.0), _client(client), _reader(this, 0), +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + _writer(this, 0), + _thread_lock("PStatsClientImpl::_thread_lock"), + _thread_cvar(_thread_lock) +#else _writer(this, pstats_threaded_write ? 1 : 0) +#endif { +#if !defined(HAVE_THREADS) || defined(SIMPLE_THREADS) _writer.set_max_queue_size(pstats_max_queue_size); +#endif _reader.set_tcp_header_size(4); _writer.set_tcp_header_size(4); _is_connected = false; @@ -240,6 +250,17 @@ client_connect(std::string hostname, int port) { transmit_control_data(); } +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + if (_is_connected && pstats_threaded_write) { + _thread = new GenericThread("PStats", "PStats", [this]() { + this->thread_main(); + }); + if (!_thread->start(TP_low, false)) { + _thread.clear(); + } + } +#endif + return _is_connected; } @@ -248,6 +269,17 @@ client_connect(std::string hostname, int port) { */ void PStatClientImpl:: client_disconnect() { +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + // Tell the thread to shut itself down. Note that this may be called from + // the thread itself, so we shouldn't try to call join(). + _thread_lock.lock(); + if (_thread != nullptr) { + _thread_should_shutdown = true; + _thread_cvar.notify(); + } + _thread_lock.unlock(); +#endif + if (_thread_profiling) { // Switch the functions back to what they were. Thread::_sleep_func = &ThreadImpl::sleep; @@ -299,7 +331,11 @@ new_frame(int thread_index, int frame_number) { // If we're the main thread, we should exchange control packets with the // server. +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + if (thread_index == 0 && _thread == nullptr) { +#else if (thread_index == 0) { +#endif transmit_control_data(); } @@ -378,7 +414,7 @@ new_frame(int thread_index, int frame_number) { _client->start(pstats_index, current_thread_index, frame_start); if (!frame_data.is_empty()) { - transmit_frame_data(thread_index, frame_number, frame_data); + enqueue_frame_data(thread_index, frame_number, std::move(frame_data)); } _client->stop(pstats_index, current_thread_index, get_real_time()); } @@ -388,14 +424,18 @@ new_frame(int thread_index, int frame_number) { * data. */ void PStatClientImpl:: -add_frame(int thread_index, int frame_number, const PStatFrameData &frame_data) { +add_frame(int thread_index, int frame_number, PStatFrameData &&frame_data) { nassertv(thread_index >= 0 && thread_index < _client->_num_threads); PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index); // If we're the main thread, we should exchange control packets with the // server. +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + if (thread_index == 0 && _thread == nullptr) { +#else if (thread_index == 0) { +#endif transmit_control_data(); } @@ -414,10 +454,84 @@ add_frame(int thread_index, int frame_number, const PStatFrameData &frame_data) int pstats_index = PStatClient::_pstats_pcollector.get_index(); _client->start(pstats_index, current_thread_index); - transmit_frame_data(thread_index, frame_number, frame_data); + enqueue_frame_data(thread_index, frame_number, std::move(frame_data)); _client->stop(pstats_index, current_thread_index); } +/** + * Passes off the frame data to the writer thread. If threading is disabled, + * transmits it right away. + */ +void PStatClientImpl:: +enqueue_frame_data(int thread_index, int frame_number, + PStatFrameData &&frame_data) { +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + if (_thread != nullptr) { + int max_size = pstats_max_queue_size; + _thread_lock.lock(); + if (max_size < 0 || _frame_queue.size() < (size_t)max_size) { + _frame_queue.emplace_back(thread_index, frame_number); + frame_data.swap(_frame_queue.back()._frame_data); + } + _thread_cvar.notify(); + _thread_lock.unlock(); + return; + } +#endif + + // We don't have a thread, so transmit it directly. + if (_is_connected) { + transmit_frame_data(thread_index, frame_number, frame_data); + } +} + +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) +/** + * + */ +void PStatClientImpl:: +thread_main() { + MutexHolder holder(_thread_lock); + transmit_control_data(); + + while (!_thread_should_shutdown) { + while (_frame_queue.empty() && !_thread_should_shutdown) { + _thread_cvar.wait(); + } + + while (!_frame_queue.empty()) { + // Dequeue up to 8 at a time, to decrease the amount of times we need to + // hold the lock. + QueuedFrame frames[8]; + int num_frames = 0; + + while (!_frame_queue.empty() && num_frames < 8) { + QueuedFrame &qf = _frame_queue.front(); + frames[num_frames]._thread_index = qf._thread_index; + frames[num_frames]._frame_number = qf._frame_number; + frames[num_frames]._frame_data.swap(qf._frame_data); + ++num_frames; + _frame_queue.pop_front(); + } + _thread_lock.unlock(); + + transmit_control_data(); + + if (num_frames > 0) { + for (int i = 0; i < num_frames; ++i) { + QueuedFrame &qf = frames[i]; + transmit_frame_data(qf._thread_index, qf._frame_number, qf._frame_data); + } + } + + _thread_lock.lock(); + } + } + + _thread = nullptr; +} +#endif + /** * Should be called once per frame per thread to transmit the latest data to * the PStatServer. diff --git a/panda/src/pstatclient/pStatClientImpl.h b/panda/src/pstatclient/pStatClientImpl.h index f758d4b262..34c14caa64 100644 --- a/panda/src/pstatclient/pStatClientImpl.h +++ b/panda/src/pstatclient/pStatClientImpl.h @@ -24,6 +24,8 @@ #include "queuedConnectionReader.h" #include "connectionWriter.h" #include "netAddress.h" +#include "pmutex.h" +#include "conditionVar.h" #include "trueClock.h" #include "pmap.h" @@ -66,9 +68,16 @@ public: INLINE void client_resume_after_pause(); void new_frame(int thread_index, int frame_number = -1); - void add_frame(int thread_index, int frame_number, const PStatFrameData &frame_data); + void add_frame(int thread_index, int frame_number, PStatFrameData &&frame_data); private: + void enqueue_frame_data(int thread_index, int frame_number, + PStatFrameData &&frame_data); + +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + void thread_main(); +#endif + void transmit_frame_data(int thread_index, int frame_number, const PStatFrameData &frame_data); @@ -100,6 +109,23 @@ private: PT(Connection) _tcp_connection; PT(Connection) _udp_connection; +#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS) + PT(Thread) _thread; + Mutex _thread_lock; + ConditionVar _thread_cvar; + bool _thread_should_shutdown = false; + + struct QueuedFrame { + QueuedFrame() = default; + QueuedFrame(int thread_index, int frame_number); + + int _thread_index; + int _frame_number; + PStatFrameData _frame_data; + }; + pdeque _frame_queue; +#endif + int _collectors_reported; int _threads_reported; diff --git a/panda/src/pstatclient/pStatThread.cxx b/panda/src/pstatclient/pStatThread.cxx index 0315b5591f..d6ab806f01 100644 --- a/panda/src/pstatclient/pStatThread.cxx +++ b/panda/src/pstatclient/pStatThread.cxx @@ -35,9 +35,9 @@ new_frame(int frame_number) { * data to send for this frame. */ void PStatThread:: -add_frame(int frame_number, const PStatFrameData &frame_data) { +add_frame(int frame_number, PStatFrameData &&frame_data) { #ifdef DO_PSTATS - _client->get_impl()->add_frame(_index, frame_number, frame_data); + _client->get_impl()->add_frame(_index, frame_number, std::move(frame_data)); #endif } diff --git a/panda/src/pstatclient/pStatThread.h b/panda/src/pstatclient/pStatThread.h index 45a4a38e4b..4810b349cd 100644 --- a/panda/src/pstatclient/pStatThread.h +++ b/panda/src/pstatclient/pStatThread.h @@ -37,7 +37,7 @@ PUBLISHED: INLINE void operator = (const PStatThread ©); void new_frame(int frame_number = -1); - void add_frame(int frame_number, const PStatFrameData &frame_data); + void add_frame(int frame_number, PStatFrameData &&frame_data); Thread *get_thread() const; INLINE int get_index() const;