fix pstats-threaded-write in simple-threads case

This commit is contained in:
David Rose 2008-09-13 03:39:21 +00:00
parent 52fa05d6f9
commit fd0af5b6ef
12 changed files with 135 additions and 35 deletions

View File

@ -18,7 +18,7 @@
class Socket_fdset
{
public:
PUBLISHED:
inline Socket_fdset();
inline void setForSocket(const Socket_IP &incon);

View File

@ -158,12 +158,13 @@ bool Socket_TCP::ActiveOpenNonBlocking(const Socket_Address & theaddress)
SetNonBlocking();
SetReuseAddress();
if (DO_CONNECT(_socket, &theaddress.GetAddressInfo()) != 0)
if (DO_CONNECT(_socket, &theaddress.GetAddressInfo()) != 0) {
if (GETERROR() != LOCAL_CONNECT_BLOCKING)
{
printf(" None Blockign Connect Error %d",GETERROR());
return ErrorClose();
}
}
return true;
}

View File

@ -367,8 +367,13 @@ send_datagram(const NetDatagram &datagram, int tcp_header_size) {
data += header.get_header();
data += datagram.get_message();
if (net_cat.is_debug()) {
header.verify_datagram(datagram);
}
int bytes_to_send = data.length();
Socket_Address addr = datagram.get_address().get_addr();
bool okflag = udp->SendTo(data, addr);
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
while (!okflag && udp->GetLastError() == LOCAL_BLOCKING_ERROR) {
@ -377,13 +382,9 @@ send_datagram(const NetDatagram &datagram, int tcp_header_size) {
}
#endif // SIMPLE_THREADS
if (net_cat.is_debug()) {
header.verify_datagram(datagram);
}
if (net_cat.is_spam()) {
net_cat.spam()
<< "Sending UDP datagram with "
<< "Sent UDP datagram with "
<< bytes_to_send << " bytes to " << (void *)this
<< ", ok = " << okflag << "\n";
}
@ -449,7 +450,7 @@ send_raw_datagram(const NetDatagram &datagram) {
if (net_cat.is_spam()) {
net_cat.spam()
<< "Sending UDP datagram with "
<< "Sent UDP datagram with "
<< data.size() << " bytes to " << (void *)this
<< ", ok = " << okflag << "\n";
}

View File

@ -19,6 +19,7 @@
#include "netAddress.h"
#include "config_net.h"
#include "mutexHolder.h"
#include "trueClock.h"
#ifdef WIN32_VC
#include <winsock2.h> // For gethostname()
@ -145,7 +146,37 @@ open_TCP_server_rendezvous(int port, int backlog) {
PT(Connection) ConnectionManager::
open_TCP_client_connection(const NetAddress &address, int timeout_ms) {
Socket_TCP *socket = new Socket_TCP;
bool okflag = socket->ActiveOpen(address.get_addr(),true);
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
// In the simple-threads case, use a non-blocking connect.
bool okflag = socket->ActiveOpenNonBlocking(address.get_addr());
if (okflag && socket->GetLastError() == LOCAL_CONNECT_BLOCKING) {
// Now wait for the socket to connect.
TrueClock *clock = TrueClock::get_global_ptr();
double start = clock->get_short_time();
Thread::force_yield();
Socket_fdset fset;
fset.setForSocket(*socket);
int ready = fset.WaitForWrite(true, 0);
while (ready == 0) {
double elapsed = clock->get_short_time() - start;
if (elapsed * 1000.0 > timeout_ms) {
// Timeout.
okflag = false;
break;
}
Thread::force_yield();
fset.setForSocket(*socket);
ready = fset.WaitForWrite(true, 0);
}
}
#else
// In the normal case, we can just do a blocking connect.
bool okflag = socket->ActiveOpen(address.get_addr(), false);
#endif // SIMPLE_THREADS
if (!okflag) {
net_cat.error()
<< "Unable to open TCP connection to server "

View File

@ -98,10 +98,7 @@ ConnectionReader::
ConnectionReader(ConnectionManager *manager, int num_threads) :
_manager(manager)
{
if (!Thread::is_true_threads()) {
// There is no point in using threads for this kind of I/O unless
// we actually have real threads available (i.e. HAVE_THREADS is
// defined, and SIMPLE_THREADS is not).
if (!Thread::is_threading_supported()) {
#ifndef NDEBUG
if (num_threads != 0) {
if (net_cat.is_debug()) {

View File

@ -57,10 +57,7 @@ ConnectionWriter::
ConnectionWriter(ConnectionManager *manager, int num_threads) :
_manager(manager)
{
if (!Thread::is_true_threads()) {
// There is no point in using threads for this kind of I/O unless
// we actually have real threads available (i.e. HAVE_THREADS is
// defined, and SIMPLE_THREADS is not).
if (!Thread::is_threading_supported()) {
#ifndef NDEBUG
if (num_threads != 0) {
if (net_cat.is_debug()) {
@ -109,6 +106,38 @@ ConnectionWriter::
(*ti)->join();
}
}
// Function: ConnectionWriter::set_max_queue_size
// Access: Public
// Description: Limits the number of packets that may be pending on
// the outbound queue. This only has an effect when
// using threads; if num_threads is 0, then all packets
// are sent immediately.
////////////////////////////////////////////////////////////////////
void ConnectionWriter::
set_max_queue_size(int max_size) {
_queue.set_max_queue_size(max_size);
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::get_max_queue_size
// Access: Public
// Description: Returns the maximum size the queue is allowed to grow
// to. See set_max_queue_size().
////////////////////////////////////////////////////////////////////
int ConnectionWriter::
get_max_queue_size() const {
return _queue.get_max_queue_size();
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::get_current_queue_size
// Access: Public
// Description: Returns the current number of things in the queue.
////////////////////////////////////////////////////////////////////
int ConnectionWriter::
get_current_queue_size() const {
return _queue.get_current_queue_size();
}
////////////////////////////////////////////////////////////////////
@ -125,9 +154,13 @@ ConnectionWriter::
// only returns false if the send queue is filled; it's
// impossible to detect a transmission error at this
// point.
//
// If block is true, this will not return false if the
// send queue is filled; instead, it will wait until
// this is space available.
////////////////////////////////////////////////////////////////////
bool ConnectionWriter::
send(const Datagram &datagram, const PT(Connection) &connection) {
send(const Datagram &datagram, const PT(Connection) &connection, bool block) {
nassertr(connection != (Connection *)NULL, false);
nassertr(connection->get_socket()->is_exact_type(Socket_TCP::get_class_type()), false);
@ -141,7 +174,7 @@ send(const Datagram &datagram, const PT(Connection) &connection) {
return connection->send_datagram(copy, _tcp_header_size);
}
} else {
return _queue.insert(copy);
return _queue.insert(copy, block);
}
}
@ -160,10 +193,14 @@ send(const Datagram &datagram, const PT(Connection) &connection) {
// only returns false if the send queue is filled; it's
// impossible to detect a transmission error at this
// point.
//
// If block is true, this will not return false if the
// send queue is filled; instead, it will wait until
// this is space available.
////////////////////////////////////////////////////////////////////
bool ConnectionWriter::
send(const Datagram &datagram, const PT(Connection) &connection,
const NetAddress &address) {
const NetAddress &address, bool block) {
nassertr(connection != (Connection *)NULL, false);
nassertr(connection->get_socket()->is_exact_type(Socket_UDP::get_class_type()), false);
@ -186,7 +223,7 @@ send(const Datagram &datagram, const PT(Connection) &connection,
return connection->send_datagram(copy, _tcp_header_size);
}
} else {
return _queue.insert(copy);
return _queue.insert(copy, block);
}
}

View File

@ -40,12 +40,18 @@ PUBLISHED:
ConnectionWriter(ConnectionManager *manager, int num_threads);
~ConnectionWriter();
bool send(const Datagram &datagram,
const PT(Connection) &connection);
void set_max_queue_size(int max_size);
int get_max_queue_size() const;
int get_current_queue_size() const;
bool send(const Datagram &datagram,
const PT(Connection) &connection,
const NetAddress &address);
bool block = false);
bool send(const Datagram &datagram,
const PT(Connection) &connection,
const NetAddress &address,
bool block = false);
bool is_valid_for_udp(const Datagram &datagram) const;

View File

@ -68,16 +68,27 @@ shutdown() {
// threads are waiting on the queue, this will wake one
// of them up. Returns true if successful, false if the
// queue was full.
//
// If block is true, this will not return until
// successful, waiting until the queue has space
// available if necessary.
////////////////////////////////////////////////////////////////////
bool DatagramQueue::
insert(const NetDatagram &data) {
insert(const NetDatagram &data, bool block) {
MutexHolder holder(_cvlock);
bool enqueue_ok = ((int)_queue.size() < _max_queue_size);
if (block) {
while (!enqueue_ok && !_shutdown) {
_cv.wait();
enqueue_ok = ((int)_queue.size() < _max_queue_size);
}
}
if (enqueue_ok) {
_queue.push_back(data);
}
_cv.signal();
_cv.signal(); // Only need to wake up one thread.
return enqueue_ok;
}
@ -121,6 +132,9 @@ extract(NetDatagram &result) {
result = _queue.front();
_queue.pop_front();
// Wake up any threads waiting to stuff things into the queue.
_cv.signal_all();
return true;
}

View File

@ -34,7 +34,7 @@ public:
~DatagramQueue();
void shutdown();
bool insert(const NetDatagram &data);
bool insert(const NetDatagram &data, bool block = false);
bool extract(NetDatagram &result);
void set_max_queue_size(int max_size);
@ -43,7 +43,7 @@ public:
private:
Mutex _cvlock;
ConditionVarFull _cv;
ConditionVarFull _cv; // signaled when queue contents change.
typedef pdeque<NetDatagram> QueueType;
QueueType _queue;

View File

@ -39,6 +39,15 @@ ConfigVariableBool pstats_threaded_write
"wouldn't want this set true, unless you suspect something is "
"broken with the threaded network interfaces."));
ConfigVariableInt pstats_max_queue_size
("pstats-max-queue-size", 1,
PRC_DESC("If pstats-threaded-write is true, this specifies the maximum "
"number of packets (generally, frames of data) that may be queued "
"up for the thread to process. If this is large, the writer "
"thread may fall behind and the output of PStats will lag. Keep "
"this small to drop missed packets on the floor instead, and "
"ensure that the frame data does not grow stale."));
ConfigVariableDouble pstats_tcp_ratio
("pstats-tcp-ratio", 0.01,
PRC_DESC("This specifies the ratio of frame update messages that are eligible "

View File

@ -32,6 +32,7 @@ NotifyCategoryDecl(pstats, EXPCL_PANDA_PSTATCLIENT, EXPTP_PANDA_PSTATCLIENT);
extern EXPCL_PANDA_PSTATCLIENT ConfigVariableString pstats_name;
extern EXPCL_PANDA_PSTATCLIENT ConfigVariableDouble pstats_max_rate;
extern EXPCL_PANDA_PSTATCLIENT ConfigVariableBool pstats_threaded_write;
extern EXPCL_PANDA_PSTATCLIENT ConfigVariableInt pstats_max_queue_size;
extern EXPCL_PANDA_PSTATCLIENT ConfigVariableDouble pstats_tcp_ratio;
extern EXPCL_PANDA_PSTATCLIENT ConfigVariableString pstats_host;

View File

@ -47,6 +47,7 @@ PStatClientImpl(PStatClient *client) :
_reader(this, 0),
_writer(this, pstats_threaded_write ? 1 : 0)
{
_writer.set_max_queue_size(pstats_max_queue_size);
_reader.set_tcp_header_size(4);
_writer.set_tcp_header_size(4);
_is_connected = false;
@ -289,11 +290,13 @@ transmit_frame_data(int thread_index) {
thread->_next_packet = now + packet_delay;
if (!sent) {
if (pstats_cat.is_debug()) {
pstats_cat.debug()
<< "Couldn't send packet.\n";
}
}
}
}
}
////////////////////////////////////////////////////////////////////
@ -363,7 +366,7 @@ send_hello() {
Datagram datagram;
message.encode(datagram);
_writer.send(datagram, _tcp_connection);
_writer.send(datagram, _tcp_connection, true);
}
////////////////////////////////////////////////////////////////////
@ -393,7 +396,7 @@ report_new_collectors() {
Datagram datagram;
message.encode(datagram);
_writer.send(datagram, _tcp_connection);
_writer.send(datagram, _tcp_connection, true);
}
}
@ -418,7 +421,7 @@ report_new_threads() {
Datagram datagram;
message.encode(datagram);
_writer.send(datagram, _tcp_connection);
_writer.send(datagram, _tcp_connection, true);
}
}