pstats: Offload more work to separate PStats thread

This improves performance of PStats client when a lot of data is being sent per frame.

Also increase the value of pstats-max-queue-size, since otherwise we're constantly dropping frames when using multiple threads.
This commit is contained in:
rdb 2022-12-03 22:20:23 +01:00
parent 82522a7bef
commit bb530435e8
7 changed files with 158 additions and 9 deletions

View File

@ -4522,7 +4522,7 @@ end_frame_timing(const FrameTiming &frame) {
// The end time of the last collector is implicitly the frame's end time.
frame_data.add_stop(0, frame_data.get_end());
gpu_thread.add_frame(frame._frame_number, frame_data);
gpu_thread.add_frame(frame._frame_number, std::move(frame_data));
_timer_queries_pcollector.add_level_now(frame._queries.size());
#endif

View File

@ -43,7 +43,7 @@ ConfigVariableBool pstats_threaded_write
"broken with the threaded network interfaces."));
ConfigVariableInt pstats_max_queue_size
("pstats-max-queue-size", 1,
("pstats-max-queue-size", 32,
PRC_DESC("If pstats-threaded-write is true, this specifies the maximum "
"number of packets (generally, frames of data) that may be queued "
"up for the thread to process. If this is large, the writer "

View File

@ -90,3 +90,12 @@ client_resume_after_pause() {
double delta = _clock->get_short_time() - _last_frame;
_delta -= delta;
}
/**
*
*/
INLINE PStatClientImpl::QueuedFrame::
QueuedFrame(int thread_index, int frame_number) :
_thread_index(thread_index),
_frame_number(frame_number) {
}

View File

@ -26,6 +26,8 @@
#include "cmath.h"
#include "conditionVarWin32Impl.h"
#include "conditionVarPosixImpl.h"
#include "genericThread.h"
#include "mutexHolder.h"
#include <algorithm>
@ -53,9 +55,17 @@ PStatClientImpl(PStatClient *client) :
_last_frame(0.0),
_client(client),
_reader(this, 0),
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
_writer(this, 0),
_thread_lock("PStatsClientImpl::_thread_lock"),
_thread_cvar(_thread_lock)
#else
_writer(this, pstats_threaded_write ? 1 : 0)
#endif
{
#if !defined(HAVE_THREADS) || defined(SIMPLE_THREADS)
_writer.set_max_queue_size(pstats_max_queue_size);
#endif
_reader.set_tcp_header_size(4);
_writer.set_tcp_header_size(4);
_is_connected = false;
@ -240,6 +250,17 @@ client_connect(std::string hostname, int port) {
transmit_control_data();
}
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
if (_is_connected && pstats_threaded_write) {
_thread = new GenericThread("PStats", "PStats", [this]() {
this->thread_main();
});
if (!_thread->start(TP_low, false)) {
_thread.clear();
}
}
#endif
return _is_connected;
}
@ -248,6 +269,17 @@ client_connect(std::string hostname, int port) {
*/
void PStatClientImpl::
client_disconnect() {
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
// Tell the thread to shut itself down. Note that this may be called from
// the thread itself, so we shouldn't try to call join().
_thread_lock.lock();
if (_thread != nullptr) {
_thread_should_shutdown = true;
_thread_cvar.notify();
}
_thread_lock.unlock();
#endif
if (_thread_profiling) {
// Switch the functions back to what they were.
Thread::_sleep_func = &ThreadImpl::sleep;
@ -299,7 +331,11 @@ new_frame(int thread_index, int frame_number) {
// If we're the main thread, we should exchange control packets with the
// server.
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
if (thread_index == 0 && _thread == nullptr) {
#else
if (thread_index == 0) {
#endif
transmit_control_data();
}
@ -378,7 +414,7 @@ new_frame(int thread_index, int frame_number) {
_client->start(pstats_index, current_thread_index, frame_start);
if (!frame_data.is_empty()) {
transmit_frame_data(thread_index, frame_number, frame_data);
enqueue_frame_data(thread_index, frame_number, std::move(frame_data));
}
_client->stop(pstats_index, current_thread_index, get_real_time());
}
@ -388,14 +424,18 @@ new_frame(int thread_index, int frame_number) {
* data.
*/
void PStatClientImpl::
add_frame(int thread_index, int frame_number, const PStatFrameData &frame_data) {
add_frame(int thread_index, int frame_number, PStatFrameData &&frame_data) {
nassertv(thread_index >= 0 && thread_index < _client->_num_threads);
PStatClient::InternalThread *pthread = _client->get_thread_ptr(thread_index);
// If we're the main thread, we should exchange control packets with the
// server.
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
if (thread_index == 0 && _thread == nullptr) {
#else
if (thread_index == 0) {
#endif
transmit_control_data();
}
@ -414,10 +454,84 @@ add_frame(int thread_index, int frame_number, const PStatFrameData &frame_data)
int pstats_index = PStatClient::_pstats_pcollector.get_index();
_client->start(pstats_index, current_thread_index);
transmit_frame_data(thread_index, frame_number, frame_data);
enqueue_frame_data(thread_index, frame_number, std::move(frame_data));
_client->stop(pstats_index, current_thread_index);
}
/**
* Passes off the frame data to the writer thread. If threading is disabled,
* transmits it right away.
*/
void PStatClientImpl::
enqueue_frame_data(int thread_index, int frame_number,
PStatFrameData &&frame_data) {
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
if (_thread != nullptr) {
int max_size = pstats_max_queue_size;
_thread_lock.lock();
if (max_size < 0 || _frame_queue.size() < (size_t)max_size) {
_frame_queue.emplace_back(thread_index, frame_number);
frame_data.swap(_frame_queue.back()._frame_data);
}
_thread_cvar.notify();
_thread_lock.unlock();
return;
}
#endif
// We don't have a thread, so transmit it directly.
if (_is_connected) {
transmit_frame_data(thread_index, frame_number, frame_data);
}
}
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
/**
*
*/
void PStatClientImpl::
thread_main() {
MutexHolder holder(_thread_lock);
transmit_control_data();
while (!_thread_should_shutdown) {
while (_frame_queue.empty() && !_thread_should_shutdown) {
_thread_cvar.wait();
}
while (!_frame_queue.empty()) {
// Dequeue up to 8 at a time, to decrease the amount of times we need to
// hold the lock.
QueuedFrame frames[8];
int num_frames = 0;
while (!_frame_queue.empty() && num_frames < 8) {
QueuedFrame &qf = _frame_queue.front();
frames[num_frames]._thread_index = qf._thread_index;
frames[num_frames]._frame_number = qf._frame_number;
frames[num_frames]._frame_data.swap(qf._frame_data);
++num_frames;
_frame_queue.pop_front();
}
_thread_lock.unlock();
transmit_control_data();
if (num_frames > 0) {
for (int i = 0; i < num_frames; ++i) {
QueuedFrame &qf = frames[i];
transmit_frame_data(qf._thread_index, qf._frame_number, qf._frame_data);
}
}
_thread_lock.lock();
}
}
_thread = nullptr;
}
#endif
/**
* Should be called once per frame per thread to transmit the latest data to
* the PStatServer.

View File

@ -24,6 +24,8 @@
#include "queuedConnectionReader.h"
#include "connectionWriter.h"
#include "netAddress.h"
#include "pmutex.h"
#include "conditionVar.h"
#include "trueClock.h"
#include "pmap.h"
@ -66,9 +68,16 @@ public:
INLINE void client_resume_after_pause();
void new_frame(int thread_index, int frame_number = -1);
void add_frame(int thread_index, int frame_number, const PStatFrameData &frame_data);
void add_frame(int thread_index, int frame_number, PStatFrameData &&frame_data);
private:
void enqueue_frame_data(int thread_index, int frame_number,
PStatFrameData &&frame_data);
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
void thread_main();
#endif
void transmit_frame_data(int thread_index, int frame_number,
const PStatFrameData &frame_data);
@ -100,6 +109,23 @@ private:
PT(Connection) _tcp_connection;
PT(Connection) _udp_connection;
#if defined(HAVE_THREADS) && !defined(SIMPLE_THREADS)
PT(Thread) _thread;
Mutex _thread_lock;
ConditionVar _thread_cvar;
bool _thread_should_shutdown = false;
struct QueuedFrame {
QueuedFrame() = default;
QueuedFrame(int thread_index, int frame_number);
int _thread_index;
int _frame_number;
PStatFrameData _frame_data;
};
pdeque<QueuedFrame> _frame_queue;
#endif
int _collectors_reported;
int _threads_reported;

View File

@ -35,9 +35,9 @@ new_frame(int frame_number) {
* data to send for this frame.
*/
void PStatThread::
add_frame(int frame_number, const PStatFrameData &frame_data) {
add_frame(int frame_number, PStatFrameData &&frame_data) {
#ifdef DO_PSTATS
_client->get_impl()->add_frame(_index, frame_number, frame_data);
_client->get_impl()->add_frame(_index, frame_number, std::move(frame_data));
#endif
}

View File

@ -37,7 +37,7 @@ PUBLISHED:
INLINE void operator = (const PStatThread &copy);
void new_frame(int frame_number = -1);
void add_frame(int frame_number, const PStatFrameData &frame_data);
void add_frame(int frame_number, PStatFrameData &&frame_data);
Thread *get_thread() const;
INLINE int get_index() const;