From e86f48a210a15e278bc2d4b7fc91fb441cfa8b93 Mon Sep 17 00:00:00 2001 From: David Rose Date: Sat, 5 Dec 2009 07:01:36 +0000 Subject: [PATCH] attempt to smooth out threaded network layer --- panda/src/net/config_net.cxx | 59 +++++++++++++++++++----------- panda/src/net/config_net.h | 3 ++ panda/src/net/connection.cxx | 18 +++++++-- panda/src/net/connectionReader.cxx | 14 ++++++- 4 files changed, 66 insertions(+), 28 deletions(-) diff --git a/panda/src/net/config_net.cxx b/panda/src/net/config_net.cxx index d862033d63..eb856273a7 100644 --- a/panda/src/net/config_net.cxx +++ b/panda/src/net/config_net.cxx @@ -27,28 +27,6 @@ ConfigureFn(config_net) { } -//////////////////////////////////////////////////////////////////// -// Function: init_libnet -// Description: Initializes the library. This must be called at -// least once before any of the functions or classes in -// this library can be used. Normally it will be -// called by the static initializers and need not be -// called explicitly, but special cases exist. -//////////////////////////////////////////////////////////////////// -void -init_libnet() { - static bool initialized = false; - if (initialized) { - return; - } - initialized = true; - - NetDatagram::init_type(); - - PandaSystem *ps = PandaSystem::get_global_ptr(); - ps->add_system("net"); -} - // The following two maximum queue sizes are totally arbitrary and @@ -116,3 +94,40 @@ get_max_poll_cycle() { return *max_poll_cycle; } + +ConfigVariableInt net_max_read_per_epoch +("net-max-read-per-epoch", 1024, + PRC_DESC("The maximum number of bytes to read from the net in a single " + "thread epoch, when SIMPLE_THREADS is defined. This is designed " + "to minimize the impact of the networking layer on the other " + "threads.")); + +ConfigVariableInt net_max_write_per_epoch +("net-max-write-per-epoch", 1024, + PRC_DESC("The maximum number of bytes to write to the net in a single " + "thread epoch, when SIMPLE_THREADS is defined. This is designed " + "to minimize the impact of the networking layer on the other " + "threads.")); + + +//////////////////////////////////////////////////////////////////// +// Function: init_libnet +// Description: Initializes the library. This must be called at +// least once before any of the functions or classes in +// this library can be used. Normally it will be +// called by the static initializers and need not be +// called explicitly, but special cases exist. +//////////////////////////////////////////////////////////////////// +void +init_libnet() { + static bool initialized = false; + if (initialized) { + return; + } + initialized = true; + + NetDatagram::init_type(); + + PandaSystem *ps = PandaSystem::get_global_ptr(); + ps->add_system("net"); +} diff --git a/panda/src/net/config_net.h b/panda/src/net/config_net.h index 01099ca9d9..7ef19275df 100644 --- a/panda/src/net/config_net.h +++ b/panda/src/net/config_net.h @@ -29,6 +29,9 @@ extern int get_net_max_response_queue(); extern bool get_net_error_abort(); extern double get_max_poll_cycle(); +extern ConfigVariableInt net_max_read_per_epoch; +extern ConfigVariableInt net_max_write_per_epoch; + extern EXPCL_PANDA_NET void init_libnet(); #endif diff --git a/panda/src/net/connection.cxx b/panda/src/net/connection.cxx index e393847d75..9debcc316f 100644 --- a/panda/src/net/connection.cxx +++ b/panda/src/net/connection.cxx @@ -503,9 +503,10 @@ do_flush() { _queued_count = 0; _queued_data_start = TrueClock::get_global_ptr()->get_short_time(); - int data_sent = tcp->SendData(sending_data); - bool okflag = (data_sent == (int)sending_data.size()); #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) + int max_send = net_max_write_per_epoch; + int data_sent = tcp->SendData(sending_data.data(), min((size_t)max_send, sending_data.size())); + bool okflag = (data_sent == (int)sending_data.size()); if (!okflag) { int total_sent = 0; if (data_sent > 0) { @@ -514,14 +515,23 @@ do_flush() { double last_report = 0; while (!okflag && tcp->Active() && (data_sent > 0 || tcp->GetLastError() == LOCAL_BLOCKING_ERROR)) { - Thread::force_yield(); - data_sent = tcp->SendData(sending_data.data() + total_sent, sending_data.size() - total_sent); + if (data_sent == 0) { + Thread::force_yield(); + } else { + Thread::consider_yield(); + } + data_sent = tcp->SendData(sending_data.data() + total_sent, min((size_t)max_send, sending_data.size() - total_sent)); if (data_sent > 0) { total_sent += data_sent; } okflag = (total_sent == (int)sending_data.size()); } } + +#else // SIMPLE_THREADS + int data_sent = tcp->SendData(sending_data); + bool okflag = (data_sent == (int)sending_data.size()); + #endif // SIMPLE_THREADS return check_send_error(okflag); diff --git a/panda/src/net/connectionReader.cxx b/panda/src/net/connectionReader.cxx index 862f021394..4d4667bc37 100644 --- a/panda/src/net/connectionReader.cxx +++ b/panda/src/net/connectionReader.cxx @@ -621,6 +621,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { } header_bytes_read += bytes_read; + Thread::consider_yield(); } // Now we must decode the header to determine how big the datagram @@ -643,15 +644,23 @@ process_incoming_tcp_data(SocketInfo *sinfo) { while (!_shutdown && (int)datagram.get_length() < size) { int bytes_read; + int read_bytes = read_buffer_size; +#ifdef SIMPLE_THREADS + // In the SIMPLE_THREADS case, we want to limit the number of + // bytes we read in a single epoch, to minimize the impact on the + // other threads. + read_bytes = min(read_buffer_size, (int)net_max_read_per_epoch); +#endif + bytes_read = - socket->RecvData(buffer, min(read_buffer_size, + socket->RecvData(buffer, min(read_bytes, (int)(size - datagram.get_length()))); #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR && socket->Active()) { Thread::force_yield(); bytes_read = - socket->RecvData(buffer, min(read_buffer_size, + socket->RecvData(buffer, min(read_bytes, (int)(size - datagram.get_length()))); } #endif // SIMPLE_THREADS @@ -678,6 +687,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { << "Discarding " << bytes_read - datagram_bytes << " bytes following TCP datagram.\n"; } + Thread::consider_yield(); } // Now that we've read all the data, it's time to finish the socket