From 00b3e0ca78e876ef7c0f515912c4762101d4905f Mon Sep 17 00:00:00 2001 From: David Rose Date: Wed, 2 Apr 2003 18:52:16 +0000 Subject: [PATCH] add collect-tcp --- direct/src/distributed/ClientRepository.py | 8 + panda/src/downloader/httpChannel.cxx | 7 +- panda/src/downloader/httpClient.cxx | 9 +- panda/src/downloader/socketStream.I | 196 ++++++++++++++ panda/src/downloader/socketStream.cxx | 8 +- panda/src/downloader/socketStream.h | 27 ++ panda/src/express/config_express.cxx | 6 + panda/src/express/config_express.h | 3 + panda/src/net/connection.cxx | 299 ++++++++++++++++----- panda/src/net/connection.h | 25 +- panda/src/net/connectionWriter.cxx | 12 - panda/src/net/queuedConnectionReader.cxx | 6 +- 12 files changed, 507 insertions(+), 99 deletions(-) diff --git a/direct/src/distributed/ClientRepository.py b/direct/src/distributed/ClientRepository.py index 009415101c..e64d96622b 100644 --- a/direct/src/distributed/ClientRepository.py +++ b/direct/src/distributed/ClientRepository.py @@ -208,6 +208,14 @@ class ClientRepository(DirectObject.DirectObject): # to None; enforce that condition if not self.tcpConn: return 0 + + # Make sure any recently-sent datagrams are flushed when the + # time expires, if we're in collect-tcp mode. + # Temporary try .. except for old Pandas. + try: + self.tcpConn.considerFlush() + except: + pass if self.connectHttp: datagram = Datagram() diff --git a/panda/src/downloader/httpChannel.cxx b/panda/src/downloader/httpChannel.cxx index 16b3e30c3d..40bed411ef 100644 --- a/panda/src/downloader/httpChannel.cxx +++ b/panda/src/downloader/httpChannel.cxx @@ -1755,14 +1755,17 @@ http_getline(string &str) { // Access: Private // Description: Sends a series of lines to the server. Returns true // if the buffer is fully sent, or false if some of it -// remains. +// remains. If this returns false, the function must be +// called again later, passing in the exact same string, +// until the return value is true. //////////////////////////////////////////////////////////////////// bool HTTPChannel:: http_send(const string &str) { nassertr(str.length() > _sent_so_far, true); // Use the underlying BIO to write to the server, instead of the - // BIOStream, which would insist on blocking. + // BIOStream, which would insist on blocking (and might furthermore + // delay the send due to collect-tcp mode being enabled). size_t bytes_to_send = str.length() - _sent_so_far; int write_count = BIO_write(*_bio, str.data() + _sent_so_far, bytes_to_send); diff --git a/panda/src/downloader/httpClient.cxx b/panda/src/downloader/httpClient.cxx index 849cd0f3cf..3fa1b1d281 100644 --- a/panda/src/downloader/httpClient.cxx +++ b/panda/src/downloader/httpClient.cxx @@ -319,8 +319,13 @@ clear_expected_servers() { // Description: Returns a new HTTPChannel object that may be used // for reading multiple documents using the same // connection, for greater network efficiency than -// calling HTTPClient::get_document() repeatedly (and -// thus forcing a new connection for each document). +// calling HTTPClient::get_document() repeatedly (which +// would force a new connection for each document). +// +// Also, HTTPChannel has some additional, less common +// interface methods than the basic interface methods +// that exist on HTTPClient; if you wish to call any of +// these methods you must first obtain an HTTPChannel. // // Pass true for persistent_connection to gain this // network efficiency. If, on the other hand, your diff --git a/panda/src/downloader/socketStream.I b/panda/src/downloader/socketStream.I index d37bc3c131..960f1607f7 100644 --- a/panda/src/downloader/socketStream.I +++ b/panda/src/downloader/socketStream.I @@ -34,6 +34,104 @@ ISocketStream(streambuf *buf) : istream(buf) { //////////////////////////////////////////////////////////////////// INLINE OSocketStream:: OSocketStream(streambuf *buf) : ostream(buf) { + _collect_tcp = collect_tcp; + _collect_tcp_interval = collect_tcp_interval; + _queued_data_start = 0.0; +} + +//////////////////////////////////////////////////////////////////// +// Function: OSocketStream::set_collect_tcp +// Access: Published +// Description: Enables or disables "collect-tcp" mode. In this +// mode, individual TCP packets are not sent +// immediately, but rather they are collected together +// and accumulated to be sent periodically as one larger +// TCP packet. This cuts down on overhead from the +// TCP/IP protocol, especially if many small packets +// need to be sent on the same connection, but it +// introduces additional latency (since packets must be +// held before they can be sent). +// +// See set_collect_tcp_interval() to specify the +// interval of time for which to hold packets before +// sending them. +// +// If you enable this mode, you may also need to +// periodically call consider_flush() to flush the queue +// if no packets have been sent recently. +//////////////////////////////////////////////////////////////////// +INLINE void OSocketStream:: +set_collect_tcp(bool collect_tcp) { + _collect_tcp = collect_tcp; +} + +//////////////////////////////////////////////////////////////////// +// Function: OSocketStream::get_collect_tcp +// Access: Published +// Description: Returns the current setting of "collect-tcp" mode. +// See set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +INLINE bool OSocketStream:: +get_collect_tcp() const { + return _collect_tcp; +} + +//////////////////////////////////////////////////////////////////// +// Function: OSocketStream::set_collect_tcp_interval +// Access: Published +// Description: Specifies the interval in time, in seconds, for which +// to hold TCP packets before sending all of the +// recently received packets at once. This only has +// meaning if "collect-tcp" mode is enabled; see +// set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +INLINE void OSocketStream:: +set_collect_tcp_interval(double interval) { + _collect_tcp_interval = interval; +} + +//////////////////////////////////////////////////////////////////// +// Function: OSocketStream::get_collect_tcp_interval +// Access: Published +// Description: Returns the interval in time, in seconds, for which +// to hold TCP packets before sending all of the +// recently received packets at once. This only has +// meaning if "collect-tcp" mode is enabled; see +// set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +INLINE double OSocketStream:: +get_collect_tcp_interval() const { + return _collect_tcp_interval; +} + +//////////////////////////////////////////////////////////////////// +// Function: OSocketStream::consider_flush +// Access: Published +// Description: Sends the most recently queued data if enough time +// has elapsed. This only has meaning if +// set_collect_tcp() has been set to true. +//////////////////////////////////////////////////////////////////// +INLINE bool OSocketStream:: +consider_flush() { + if (!_collect_tcp || + ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) { + return flush(); + } + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: OSocketStream::flush +// Access: Published +// Description: Sends the most recently queued data now. This only +// has meaning if set_collect_tcp() has been set to +// true. +//////////////////////////////////////////////////////////////////// +bool OSocketStream:: +flush() { + ostream::flush(); + _queued_data_start = ClockObject::get_global_clock()->get_real_time(); + return !is_closed(); } //////////////////////////////////////////////////////////////////// @@ -44,4 +142,102 @@ OSocketStream(streambuf *buf) : ostream(buf) { INLINE SocketStream:: SocketStream(streambuf *buf) : iostream(buf) { _data_expected = 0; + _collect_tcp = collect_tcp; + _collect_tcp_interval = collect_tcp_interval; + _queued_data_start = 0.0; +} + +//////////////////////////////////////////////////////////////////// +// Function: SocketStream::set_collect_tcp +// Access: Published +// Description: Enables or disables "collect-tcp" mode. In this +// mode, individual TCP packets are not sent +// immediately, but rather they are collected together +// and accumulated to be sent periodically as one larger +// TCP packet. This cuts down on overhead from the +// TCP/IP protocol, especially if many small packets +// need to be sent on the same connection, but it +// introduces additional latency (since packets must be +// held before they can be sent). +// +// See set_collect_tcp_interval() to specify the +// interval of time for which to hold packets before +// sending them. +// +// If you enable this mode, you may also need to +// periodically call consider_flush() to flush the queue +// if no packets have been sent recently. +//////////////////////////////////////////////////////////////////// +INLINE void SocketStream:: +set_collect_tcp(bool collect_tcp) { + _collect_tcp = collect_tcp; +} + +//////////////////////////////////////////////////////////////////// +// Function: SocketStream::get_collect_tcp +// Access: Published +// Description: Returns the current setting of "collect-tcp" mode. +// See set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +INLINE bool SocketStream:: +get_collect_tcp() const { + return _collect_tcp; +} + +//////////////////////////////////////////////////////////////////// +// Function: SocketStream::set_collect_tcp_interval +// Access: Published +// Description: Specifies the interval in time, in seconds, for which +// to hold TCP packets before sending all of the +// recently received packets at once. This only has +// meaning if "collect-tcp" mode is enabled; see +// set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +INLINE void SocketStream:: +set_collect_tcp_interval(double interval) { + _collect_tcp_interval = interval; +} + +//////////////////////////////////////////////////////////////////// +// Function: SocketStream::get_collect_tcp_interval +// Access: Published +// Description: Returns the interval in time, in seconds, for which +// to hold TCP packets before sending all of the +// recently received packets at once. This only has +// meaning if "collect-tcp" mode is enabled; see +// set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +INLINE double SocketStream:: +get_collect_tcp_interval() const { + return _collect_tcp_interval; +} + +//////////////////////////////////////////////////////////////////// +// Function: SocketStream::consider_flush +// Access: Published +// Description: Sends the most recently queued data if enough time +// has elapsed. This only has meaning if +// set_collect_tcp() has been set to true. +//////////////////////////////////////////////////////////////////// +INLINE bool SocketStream:: +consider_flush() { + if (!_collect_tcp || + ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) { + return flush(); + } + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: SocketStream::flush +// Access: Published +// Description: Sends the most recently queued data now. This only +// has meaning if set_collect_tcp() has been set to +// true. +//////////////////////////////////////////////////////////////////// +bool SocketStream:: +flush() { + iostream::flush(); + _queued_data_start = ClockObject::get_global_clock()->get_real_time(); + return !is_closed(); } diff --git a/panda/src/downloader/socketStream.cxx b/panda/src/downloader/socketStream.cxx index 58d9e58c98..35464e53b4 100644 --- a/panda/src/downloader/socketStream.cxx +++ b/panda/src/downloader/socketStream.cxx @@ -159,11 +159,15 @@ bool SocketStream:: send_datagram(const Datagram &dg) { Datagram header; header.add_uint16(dg.get_length()); + + // These two writes don't generate two socket calls, because the + // socket stream is always buffered. write((const char *)header.get_data(), header.get_length()); write((const char *)dg.get_data(), dg.get_length()); - flush(); - return !is_closed(); + // Now flush the buffer immediately, forcing the data to be sent + // (unless collect-tcp mode is in effect). + return consider_flush(); } #endif // HAVE_SSL diff --git a/panda/src/downloader/socketStream.h b/panda/src/downloader/socketStream.h index 1c63661d01..699eb3afc1 100644 --- a/panda/src/downloader/socketStream.h +++ b/panda/src/downloader/socketStream.h @@ -20,6 +20,8 @@ #define SOCKETSTREAM_H #include "pandabase.h" +#include "clockObject.h" +#include "config_express.h" // for collect_tcp // At the present, this module is not compiled if OpenSSL is not // available, since the only current use for it is to implement @@ -68,6 +70,19 @@ PUBLISHED: bool send_datagram(const Datagram &dg); virtual bool is_closed() = 0; + + INLINE void set_collect_tcp(bool collect_tcp); + INLINE bool get_collect_tcp() const; + INLINE void set_collect_tcp_interval(double interval); + INLINE double get_collect_tcp_interval() const; + + INLINE bool consider_flush(); + INLINE bool flush(); + +private: + bool _collect_tcp; + double _collect_tcp_interval; + double _queued_data_start; }; //////////////////////////////////////////////////////////////////// @@ -85,9 +100,21 @@ PUBLISHED: virtual bool is_closed() = 0; + INLINE void set_collect_tcp(bool collect_tcp); + INLINE bool get_collect_tcp() const; + INLINE void set_collect_tcp_interval(double interval); + INLINE double get_collect_tcp_interval() const; + + INLINE bool consider_flush(); + INLINE bool flush(); + private: size_t _data_expected; string _data_so_far; + + bool _collect_tcp; + double _collect_tcp_interval; + double _queued_data_start; }; diff --git a/panda/src/express/config_express.cxx b/panda/src/express/config_express.cxx index 8cc3f7ca58..228b9c6aef 100644 --- a/panda/src/express/config_express.cxx +++ b/panda/src/express/config_express.cxx @@ -194,6 +194,12 @@ config_express.GetBool("keep-temporary-files", false); // this false, except for testing or if you mistrust the new code. const bool use_vfs = config_express.GetBool("use-vfs", true); +// Set this true to enable accumulation of several small consecutive +// TCP datagrams into one large datagram before sending it, to reduce +// overhead from the TCP/IP protocol. See +// Connection::set_collect_tcp() or SocketStream::set_collect_tcp(). +const bool collect_tcp = config_express.GetBool("collect-tcp", false); +const double collect_tcp_interval = config_express.GetDouble("collect-tcp-interval", 0.2); // Returns the configure object for accessing config variables from a // scripting language. diff --git a/panda/src/express/config_express.h b/panda/src/express/config_express.h index a1ef4fc05c..bce6ba60e4 100644 --- a/panda/src/express/config_express.h +++ b/panda/src/express/config_express.h @@ -50,6 +50,9 @@ extern const bool keep_temporary_files; extern EXPCL_PANDAEXPRESS const bool use_vfs; +extern EXPCL_PANDAEXPRESS const bool collect_tcp; +extern EXPCL_PANDAEXPRESS const double collect_tcp_interval; + // Expose the Config variable for Python access. BEGIN_PUBLISH typedef Config::Config ConfigExpress; diff --git a/panda/src/net/connection.cxx b/panda/src/net/connection.cxx index 9d459aa1a6..2d149d5d51 100644 --- a/panda/src/net/connection.cxx +++ b/panda/src/net/connection.cxx @@ -23,12 +23,13 @@ #include "datagramUDPHeader.h" #include "pprerror.h" #include "config_net.h" +#include "config_express.h" // for collect_tcp +#include "clockObject.h" -#include //////////////////////////////////////////////////////////////////// // Function: Connection::Constructor -// Access: Public +// Access: Published // Description: Creates a connection. Normally this constructor // should not be used directly by user code; use one of // the methods in ConnectionManager to make a new @@ -40,11 +41,15 @@ Connection(ConnectionManager *manager, PRFileDesc *socket) : _socket(socket) { _write_mutex = PR_NewLock(); + _collect_tcp = collect_tcp; + _collect_tcp_interval = collect_tcp_interval; + _queued_data_start = 0.0; + _queued_count = 0; } //////////////////////////////////////////////////////////////////// // Function: Connection::Destructor -// Access: Public +// Access: Published // Description: Closes a connection. //////////////////////////////////////////////////////////////////// Connection:: @@ -53,6 +58,8 @@ Connection:: << "Deleting connection " << (void *)this << "\n"; if (_socket != (PRFileDesc *)NULL) { + flush(); + PRStatus result = PR_Close(_socket); if (result != PR_SUCCESS) { pprerror("PR_Close"); @@ -64,7 +71,7 @@ Connection:: //////////////////////////////////////////////////////////////////// // Function: Connection::get_address -// Access: Public +// Access: Published // Description: Returns the address bound to this connection, if it // is a TCP connection. //////////////////////////////////////////////////////////////////// @@ -80,7 +87,7 @@ get_address() const { //////////////////////////////////////////////////////////////////// // Function: Connection::get_manager -// Access: Public +// Access: Published // Description: Returns a pointer to the ConnectionManager object // that serves this connection. //////////////////////////////////////////////////////////////////// @@ -91,7 +98,7 @@ get_manager() const { //////////////////////////////////////////////////////////////////// // Function: Connection::get_socket -// Access: Public +// Access: Published // Description: Returns the internal NSPR pointer that defines the // connection. //////////////////////////////////////////////////////////////////// @@ -100,9 +107,107 @@ get_socket() const { return _socket; } +//////////////////////////////////////////////////////////////////// +// Function: Connection::set_collect_tcp +// Access: Published +// Description: Enables or disables "collect-tcp" mode. In this +// mode, individual TCP packets are not sent +// immediately, but rather they are collected together +// and accumulated to be sent periodically as one larger +// TCP packet. This cuts down on overhead from the +// TCP/IP protocol, especially if many small packets +// need to be sent on the same connection, but it +// introduces additional latency (since packets must be +// held before they can be sent). +// +// See set_collect_tcp_interval() to specify the +// interval of time for which to hold packets before +// sending them. +// +// If you enable this mode, you may also need to +// periodically call consider_flush() to flush the queue +// if no packets have been sent recently. +//////////////////////////////////////////////////////////////////// +void Connection:: +set_collect_tcp(bool collect_tcp) { + _collect_tcp = collect_tcp; +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::get_collect_tcp +// Access: Published +// Description: Returns the current setting of "collect-tcp" mode. +// See set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +bool Connection:: +get_collect_tcp() const { + return _collect_tcp; +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::set_collect_tcp_interval +// Access: Published +// Description: Specifies the interval in time, in seconds, for which +// to hold TCP packets before sending all of the +// recently received packets at once. This only has +// meaning if "collect-tcp" mode is enabled; see +// set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +void Connection:: +set_collect_tcp_interval(double interval) { + _collect_tcp_interval = interval; +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::get_collect_tcp_interval +// Access: Published +// Description: Returns the interval in time, in seconds, for which +// to hold TCP packets before sending all of the +// recently received packets at once. This only has +// meaning if "collect-tcp" mode is enabled; see +// set_collect_tcp(). +//////////////////////////////////////////////////////////////////// +double Connection:: +get_collect_tcp_interval() const { + return _collect_tcp_interval; +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::consider_flush +// Access: Published +// Description: Sends the most recently queued TCP datagram(s) if +// enough time has elapsed. This only has meaning if +// set_collect_tcp() has been set to true. +//////////////////////////////////////////////////////////////////// +bool Connection:: +consider_flush() { + PR_Lock(_write_mutex); + + if (!_collect_tcp || + ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) { + return do_flush(); + } + + PR_Unlock(_write_mutex); + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::flush +// Access: Published +// Description: Sends the most recently queued TCP datagram(s) now. +// This only has meaning if set_collect_tcp() has been +// set to true. +//////////////////////////////////////////////////////////////////// +bool Connection:: +flush() { + PR_Lock(_write_mutex); + return do_flush(); +} + //////////////////////////////////////////////////////////////////// // Function: Connection::set_nonblock -// Access: Public +// Access: Published // Description: Sets whether nonblocking I/O should be in effect. //////////////////////////////////////////////////////////////////// void Connection:: @@ -115,7 +220,7 @@ set_nonblock(bool flag) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_linger -// Access: Public +// Access: Published // Description: Sets the time to linger on close if data is present. // If flag is false, when you close a socket with data // available the system attempts to deliver the data to @@ -136,7 +241,7 @@ set_linger(bool flag, double time) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_reuse_addr -// Access: Public +// Access: Published // Description: Sets whether local address reuse is allowed. //////////////////////////////////////////////////////////////////// void Connection:: @@ -149,7 +254,7 @@ set_reuse_addr(bool flag) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_keep_alive -// Access: Public +// Access: Published // Description: Sets whether the connection is periodically tested to // see if it is still alive. //////////////////////////////////////////////////////////////////// @@ -163,7 +268,7 @@ set_keep_alive(bool flag) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_recv_buffer_size -// Access: Public +// Access: Published // Description: Sets the size of the receive buffer, in bytes. //////////////////////////////////////////////////////////////////// void Connection:: @@ -176,7 +281,7 @@ set_recv_buffer_size(int size) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_send_buffer_size -// Access: Public +// Access: Published // Description: Sets the size of the send buffer, in bytes. //////////////////////////////////////////////////////////////////// void Connection:: @@ -189,7 +294,7 @@ set_send_buffer_size(int size) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_ip_time_to_live -// Access: Public +// Access: Published // Description: Sets IP time-to-live. //////////////////////////////////////////////////////////////////// void Connection:: @@ -202,7 +307,7 @@ set_ip_time_to_live(int ttl) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_ip_type_of_service -// Access: Public +// Access: Published // Description: Sets IP type-of-service and precedence. //////////////////////////////////////////////////////////////////// void Connection:: @@ -215,7 +320,7 @@ set_ip_type_of_service(int tos) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_no_delay -// Access: Public +// Access: Published // Description: If flag is true, this disables the Nagle algorithm, // and prevents delaying of send to coalesce packets. //////////////////////////////////////////////////////////////////// @@ -229,7 +334,7 @@ set_no_delay(bool flag) { //////////////////////////////////////////////////////////////////// // Function: Connection::set_max_segment -// Access: Public +// Access: Published // Description: Sets the maximum segment size. //////////////////////////////////////////////////////////////////// void Connection:: @@ -253,65 +358,49 @@ bool Connection:: send_datagram(const NetDatagram &datagram) { nassertr(_socket != (PRFileDesc *)NULL, false); - PR_Lock(_write_mutex); - - PRInt32 bytes_sent; - PRInt32 result; if (PR_GetDescType(_socket) == PR_DESC_SOCKET_UDP) { + // We have to send UDP right away. + PR_Lock(_write_mutex); DatagramUDPHeader header(datagram); - string data = header.get_header() + datagram.get_message(); - bytes_sent = data.length(); + string data; + data += header.get_header(); + data += datagram.get_message(); + + PRInt32 bytes_to_send = data.length(); + PRInt32 result; result = PR_SendTo(_socket, - data.data(), bytes_sent, + data.data(), bytes_to_send, 0, datagram.get_address().get_addr(), PR_INTERVAL_NO_TIMEOUT); + PRErrorCode errcode = PR_GetError(); if (net_cat.is_debug()) { header.verify_datagram(datagram); } - } else { - DatagramTCPHeader header(datagram); - string data = header.get_header() + datagram.get_message(); - bytes_sent = data.length(); - result = PR_Send(_socket, - data.data(), bytes_sent, - 0, - PR_INTERVAL_NO_TIMEOUT); - if (net_cat.is_debug()) { - header.verify_datagram(datagram); - } + PR_Unlock(_write_mutex); + return check_send_error(result, errcode, bytes_to_send); } - PRErrorCode errcode = PR_GetError(); + // We might queue up TCP packets for later sending. + DatagramTCPHeader header(datagram); + + PR_Lock(_write_mutex); + _queued_data += header.get_header(); + _queued_data += datagram.get_message(); + _queued_count++; + + if (net_cat.is_debug()) { + header.verify_datagram(datagram); + } + + if (!_collect_tcp || + ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) { + return do_flush(); + } PR_Unlock(_write_mutex); - - if (result < 0) { - if (errcode == PR_CONNECT_RESET_ERROR -#ifdef PR_SOCKET_SHUTDOWN_ERROR - || errcode == PR_SOCKET_SHUTDOWN_ERROR - || errcode == PR_CONNECT_ABORTED_ERROR -#endif - ) { - // The connection has been reset; tell our manager about it - // and ignore it. - if (_manager != (ConnectionManager *)NULL) { - _manager->connection_reset(this); - } - - } else if (errcode != PR_PENDING_INTERRUPT_ERROR) { - pprerror("PR_SendTo"); - } - - return false; - - } else if (result != bytes_sent) { - net_cat.error() << "Not enough bytes sent to socket.\n"; - return false; - } - return true; } @@ -326,31 +415,93 @@ bool Connection:: send_raw_datagram(const NetDatagram &datagram) { nassertr(_socket != (PRFileDesc *)NULL, false); - PR_Lock(_write_mutex); - - PRInt32 bytes_sent; - PRInt32 result; if (PR_GetDescType(_socket) == PR_DESC_SOCKET_UDP) { + // We have to send UDP right away. + string data = datagram.get_message(); - bytes_sent = data.length(); + PRInt32 bytes_to_send = data.length(); + + if (net_cat.is_spam()) { + net_cat.spam() + << "Sending UDP datagram with " + << bytes_to_send << " bytes to " << (void *)this << "\n"; + } + + PR_Lock(_write_mutex); + PRInt32 result; result = PR_SendTo(_socket, - data.data(), bytes_sent, + data.data(), bytes_to_send, 0, datagram.get_address().get_addr(), PR_INTERVAL_NO_TIMEOUT); - } else { - string data = datagram.get_message(); - bytes_sent = data.length(); - result = PR_Send(_socket, - data.data(), bytes_sent, - 0, - PR_INTERVAL_NO_TIMEOUT); + PRErrorCode errcode = PR_GetError(); + + PR_Unlock(_write_mutex); + return check_send_error(result, errcode, bytes_to_send); } + // We might queue up TCP packets for later sending. + + PR_Lock(_write_mutex); + _queued_data += datagram.get_message(); + _queued_count++; + + if (!_collect_tcp || + ClockObject::get_global_clock()->get_real_time() - _queued_data_start >= _collect_tcp_interval) { + return do_flush(); + } + + PR_Unlock(_write_mutex); + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::do_flush +// Access: Private +// Description: The private implementation of flush(), this assumes +// the _write_mutex has already been locked on entry. +// It will be unlocked on return. +//////////////////////////////////////////////////////////////////// +bool Connection:: +do_flush() { + PRInt32 bytes_to_send = _queued_data.length(); + if (bytes_to_send == 0) { + _queued_count = 0; + _queued_data_start = ClockObject::get_global_clock()->get_real_time(); + PR_Unlock(_write_mutex); + return true; + } + + if (net_cat.is_spam()) { + net_cat.spam() + << "Sending " << _queued_count << " TCP datagram(s) with " + << bytes_to_send << " total bytes to " << (void *)this << "\n"; + } + + PRInt32 result; + result = PR_Send(_socket, + _queued_data.data(), bytes_to_send, + 0, + PR_INTERVAL_NO_TIMEOUT); PRErrorCode errcode = PR_GetError(); + _queued_data = string(); + _queued_count = 0; + _queued_data_start = ClockObject::get_global_clock()->get_real_time(); + PR_Unlock(_write_mutex); + return check_send_error(result, errcode, bytes_to_send); +} + +//////////////////////////////////////////////////////////////////// +// Function: Connection::check_send_error +// Access: Private +// Description: Checks the return value of a PR_Send() or PR_SendTo() +// call. +//////////////////////////////////////////////////////////////////// +bool Connection:: +check_send_error(PRInt32 result, PRErrorCode errcode, PRInt32 bytes_to_send) { if (result < 0) { if (errcode == PR_CONNECT_RESET_ERROR #ifdef PR_SOCKET_SHUTDOWN_ERROR @@ -370,7 +521,7 @@ send_raw_datagram(const NetDatagram &datagram) { return false; - } else if (result != bytes_sent) { + } else if (result != bytes_to_send) { net_cat.error() << "Not enough bytes sent to socket.\n"; return false; } diff --git a/panda/src/net/connection.h b/panda/src/net/connection.h index c597cc76cc..6c12532fe1 100644 --- a/panda/src/net/connection.h +++ b/panda/src/net/connection.h @@ -19,14 +19,13 @@ #ifndef CONNECTION_H #define CONNECTION_H -#include - -#include - +#include "pandabase.h" +#include "referenceCount.h" #include "netAddress.h" #include #include +#include class ConnectionManager; class NetDatagram; @@ -46,6 +45,14 @@ PUBLISHED: PRFileDesc *get_socket() const; + void set_collect_tcp(bool collect_tcp); + bool get_collect_tcp() const; + void set_collect_tcp_interval(double interval); + double get_collect_tcp_interval() const; + + bool consider_flush(); + bool flush(); + // Socket options. void set_nonblock(bool flag); void set_linger(bool flag, double time); @@ -61,12 +68,20 @@ PUBLISHED: private: bool send_datagram(const NetDatagram &datagram); bool send_raw_datagram(const NetDatagram &datagram); + bool do_flush(); + bool check_send_error(PRInt32 result, PRErrorCode errcode, PRInt32 bytes_to_send); ConnectionManager *_manager; PRFileDesc *_socket; PRLock *_write_mutex; -friend class ConnectionWriter; + bool _collect_tcp; + double _collect_tcp_interval; + double _queued_data_start; + string _queued_data; + int _queued_count; + + friend class ConnectionWriter; }; #endif diff --git a/panda/src/net/connectionWriter.cxx b/panda/src/net/connectionWriter.cxx index 0d56ac425c..c2abe93e0e 100644 --- a/panda/src/net/connectionWriter.cxx +++ b/panda/src/net/connectionWriter.cxx @@ -109,12 +109,6 @@ send(const Datagram &datagram, const PT(Connection) &connection) { nassertr(connection != (Connection *)NULL, false); nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_TCP, false); - if (net_cat.is_debug()) { - net_cat.debug() - << "Sending TCP datagram of " << datagram.get_length() - << " bytes\n"; - } - NetDatagram copy(datagram); copy.set_connection(connection); @@ -151,12 +145,6 @@ send(const Datagram &datagram, const PT(Connection) &connection, nassertr(connection != (Connection *)NULL, false); nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_UDP, false); - if (net_cat.is_debug()) { - net_cat.debug() - << "Sending UDP datagram of " << datagram.get_length() - << " bytes\n"; - } - if (PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_UDP && (int)datagram.get_length() > maximum_udp_datagram) { net_cat.warning() diff --git a/panda/src/net/queuedConnectionReader.cxx b/panda/src/net/queuedConnectionReader.cxx index 7cdbd73cdc..c77304a95b 100644 --- a/panda/src/net/queuedConnectionReader.cxx +++ b/panda/src/net/queuedConnectionReader.cxx @@ -116,11 +116,13 @@ get_data(Datagram &result) { //////////////////////////////////////////////////////////////////// void QueuedConnectionReader:: receive_datagram(const NetDatagram &datagram) { - if (net_cat.is_debug()) { - net_cat.debug() + /* + if (net_cat.is_spam()) { + net_cat.spam() << "Received datagram of " << datagram.get_length() << " bytes\n"; } + */ #ifdef SIMULATE_NETWORK_DELAY delay_datagram(datagram);