attempt to smooth out threaded network layer

This commit is contained in:
David Rose 2009-12-05 07:01:36 +00:00
parent 693c3171b7
commit e86f48a210
4 changed files with 66 additions and 28 deletions

View File

@ -27,28 +27,6 @@ ConfigureFn(config_net) {
}
////////////////////////////////////////////////////////////////////
// Function: init_libnet
// Description: Initializes the library. This must be called at
// least once before any of the functions or classes in
// this library can be used. Normally it will be
// called by the static initializers and need not be
// called explicitly, but special cases exist.
////////////////////////////////////////////////////////////////////
void
init_libnet() {
static bool initialized = false;
if (initialized) {
return;
}
initialized = true;
NetDatagram::init_type();
PandaSystem *ps = PandaSystem::get_global_ptr();
ps->add_system("net");
}
// The following two maximum queue sizes are totally arbitrary and
@ -116,3 +94,40 @@ get_max_poll_cycle() {
return *max_poll_cycle;
}
ConfigVariableInt net_max_read_per_epoch
("net-max-read-per-epoch", 1024,
PRC_DESC("The maximum number of bytes to read from the net in a single "
"thread epoch, when SIMPLE_THREADS is defined. This is designed "
"to minimize the impact of the networking layer on the other "
"threads."));
ConfigVariableInt net_max_write_per_epoch
("net-max-write-per-epoch", 1024,
PRC_DESC("The maximum number of bytes to write to the net in a single "
"thread epoch, when SIMPLE_THREADS is defined. This is designed "
"to minimize the impact of the networking layer on the other "
"threads."));
////////////////////////////////////////////////////////////////////
// Function: init_libnet
// Description: Initializes the library. This must be called at
// least once before any of the functions or classes in
// this library can be used. Normally it will be
// called by the static initializers and need not be
// called explicitly, but special cases exist.
////////////////////////////////////////////////////////////////////
void
init_libnet() {
static bool initialized = false;
if (initialized) {
return;
}
initialized = true;
NetDatagram::init_type();
PandaSystem *ps = PandaSystem::get_global_ptr();
ps->add_system("net");
}

View File

@ -29,6 +29,9 @@ extern int get_net_max_response_queue();
extern bool get_net_error_abort();
extern double get_max_poll_cycle();
extern ConfigVariableInt net_max_read_per_epoch;
extern ConfigVariableInt net_max_write_per_epoch;
extern EXPCL_PANDA_NET void init_libnet();
#endif

View File

@ -503,9 +503,10 @@ do_flush() {
_queued_count = 0;
_queued_data_start = TrueClock::get_global_ptr()->get_short_time();
int data_sent = tcp->SendData(sending_data);
bool okflag = (data_sent == (int)sending_data.size());
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
int max_send = net_max_write_per_epoch;
int data_sent = tcp->SendData(sending_data.data(), min((size_t)max_send, sending_data.size()));
bool okflag = (data_sent == (int)sending_data.size());
if (!okflag) {
int total_sent = 0;
if (data_sent > 0) {
@ -514,14 +515,23 @@ do_flush() {
double last_report = 0;
while (!okflag && tcp->Active() &&
(data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) {
Thread::force_yield();
data_sent = tcp->SendData(sending_data.data() + total_sent, sending_data.size() - total_sent);
if (data_sent == 0) {
Thread::force_yield();
} else {
Thread::consider_yield();
}
data_sent = tcp->SendData(sending_data.data() + total_sent, min((size_t)max_send, sending_data.size() - total_sent));
if (data_sent > 0) {
total_sent += data_sent;
}
okflag = (total_sent == (int)sending_data.size());
}
}
#else // SIMPLE_THREADS
int data_sent = tcp->SendData(sending_data);
bool okflag = (data_sent == (int)sending_data.size());
#endif // SIMPLE_THREADS
return check_send_error(okflag);

View File

@ -621,6 +621,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
}
header_bytes_read += bytes_read;
Thread::consider_yield();
}
// Now we must decode the header to determine how big the datagram
@ -643,15 +644,23 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
while (!_shutdown && (int)datagram.get_length() < size) {
int bytes_read;
int read_bytes = read_buffer_size;
#ifdef SIMPLE_THREADS
// In the SIMPLE_THREADS case, we want to limit the number of
// bytes we read in a single epoch, to minimize the impact on the
// other threads.
read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch);
#endif
bytes_read =
socket->RecvData(buffer, min(read_buffer_size,
socket->RecvData(buffer, min(read_bytes,
(int)(size - datagram.get_length())));
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR &&
socket->Active()) {
Thread::force_yield();
bytes_read =
socket->RecvData(buffer, min(read_buffer_size,
socket->RecvData(buffer, min(read_bytes,
(int)(size - datagram.get_length())));
}
#endif // SIMPLE_THREADS
@ -678,6 +687,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
<< "Discarding " << bytes_read - datagram_bytes
<< " bytes following TCP datagram.\n";
}
Thread::consider_yield();
}
// Now that we've read all the data, it's time to finish the socket