From 97949f7d0dd69050e19a2cd3eb2a3354713fe97f Mon Sep 17 00:00:00 2001 From: David Rose Date: Thu, 26 Mar 2009 18:33:34 +0000 Subject: [PATCH] fix lost message(s) at connection lost --- panda/src/net/connection.cxx | 1 + panda/src/net/connectionListener.cxx | 32 +++++---- panda/src/net/connectionListener.h | 2 +- panda/src/net/connectionManager.cxx | 37 +++++++++- panda/src/net/connectionManager.h | 1 + panda/src/net/connectionReader.cxx | 103 ++++++++++++++++++++------- panda/src/net/connectionReader.h | 11 +-- 7 files changed, 137 insertions(+), 50 deletions(-) diff --git a/panda/src/net/connection.cxx b/panda/src/net/connection.cxx index 1f785d876d..027a45c071 100644 --- a/panda/src/net/connection.cxx +++ b/panda/src/net/connection.cxx @@ -542,6 +542,7 @@ check_send_error(bool okflag) { // Assume any error means the connection has been reset; tell // our manager about it and ignore it. if (_manager != (ConnectionManager *)NULL) { + _manager->flush_read_connection(this); _manager->connection_reset(this, okflag); } return false; diff --git a/panda/src/net/connectionListener.cxx b/panda/src/net/connectionListener.cxx index 6a1d5161e2..7b569fb758 100644 --- a/panda/src/net/connectionListener.cxx +++ b/panda/src/net/connectionListener.cxx @@ -50,10 +50,10 @@ receive_datagram(const NetDatagram &) { // detected on a rendezvous port. In this case, it // performs the accept(). //////////////////////////////////////////////////////////////////// -void ConnectionListener:: +bool ConnectionListener:: process_incoming_data(SocketInfo *sinfo) { Socket_TCP_Listen *socket; - DCAST_INTO_V(socket, sinfo->get_socket()); + DCAST_INTO_R(socket, sinfo->get_socket(), false); Socket_Address addr; Socket_TCP *session = new Socket_TCP; @@ -70,20 +70,22 @@ process_incoming_data(SocketInfo *sinfo) { net_cat.error() << "Error when accepting new connection.\n"; delete session; - - } else { - NetAddress net_addr(addr); - net_cat.info() - << "Received TCP connection from client " << net_addr.get_ip_string() - << " on port " << sinfo->_connection->get_address().get_port() - << "\n"; - - PT(Connection) new_connection = new Connection(_manager, session); - if (_manager != (ConnectionManager *)NULL) { - _manager->new_connection(new_connection); - } - connection_opened(sinfo->_connection, net_addr, new_connection); + finish_socket(sinfo); + return false; } + NetAddress net_addr(addr); + net_cat.info() + << "Received TCP connection from client " << net_addr.get_ip_string() + << " on port " << sinfo->_connection->get_address().get_port() + << "\n"; + + PT(Connection) new_connection = new Connection(_manager, session); + if (_manager != (ConnectionManager *)NULL) { + _manager->new_connection(new_connection); + } + connection_opened(sinfo->_connection, net_addr, new_connection); + finish_socket(sinfo); + return true; } diff --git a/panda/src/net/connectionListener.h b/panda/src/net/connectionListener.h index 18d81dad4c..9b1232bb51 100644 --- a/panda/src/net/connectionListener.h +++ b/panda/src/net/connectionListener.h @@ -42,7 +42,7 @@ protected: const NetAddress &address, const PT(Connection) &new_connection)=0; - virtual void process_incoming_data(SocketInfo *sinfo); + virtual bool process_incoming_data(SocketInfo *sinfo); private: }; diff --git a/panda/src/net/connectionManager.cxx b/panda/src/net/connectionManager.cxx index 6a8257d245..4e05567290 100644 --- a/panda/src/net/connectionManager.cxx +++ b/panda/src/net/connectionManager.cxx @@ -31,7 +31,8 @@ // Description: //////////////////////////////////////////////////////////////////// ConnectionManager:: -ConnectionManager() { +ConnectionManager() : _set_mutex("ConnectionManager::_set_mutex") +{ } //////////////////////////////////////////////////////////////////// @@ -301,6 +302,38 @@ new_connection(const PT(Connection) &connection) { _connections.insert(connection); } +//////////////////////////////////////////////////////////////////// +// Function: ConnectionManager::flush_read_connection +// Access: Protected, Virtual +// Description: An internal function called by ConnectionWriter only +// when a write failure has occurred. This method +// ensures that all of the read data has been flushed +// from the pipe before the connection is fully removed. +//////////////////////////////////////////////////////////////////// +void ConnectionManager:: +flush_read_connection(Connection *connection) { + Readers readers; + { + LightMutexHolder holder(_set_mutex); + Connections::iterator ci = _connections.find(connection); + if (ci == _connections.end()) { + // Already closed, or not part of this ConnectionManager. + return; + } + _connections.erase(ci); + + // Get a copy first, so we can release the lock before traversing. + readers = _readers; + } + Readers::iterator ri; + for (ri = readers.begin(); ri != readers.end(); ++ri) { + (*ri)->flush_read_connection(connection); + } + + Socket_IP *socket = connection->get_socket(); + socket->Close(); +} + //////////////////////////////////////////////////////////////////// // Function: ConnectionManager::connection_reset // Access: Protected, Virtual @@ -328,7 +361,7 @@ connection_reset(const PT(Connection) &connection, bool okflag) { // Turns out we do need to explicitly mark the connection as closed // immediately, rather than waiting for the user to do it, since // otherwise we'll keep trying to listen for noise on the socket and - // we'll always here a "yes" answer. + // we'll always hear a "yes" answer. close_connection(connection); } diff --git a/panda/src/net/connectionManager.h b/panda/src/net/connectionManager.h index 02fa040d8a..f6e939aa16 100644 --- a/panda/src/net/connectionManager.h +++ b/panda/src/net/connectionManager.h @@ -62,6 +62,7 @@ PUBLISHED: protected: void new_connection(const PT(Connection) &connection); + virtual void flush_read_connection(Connection *connection); virtual void connection_reset(const PT(Connection) &connection, bool okflag); diff --git a/panda/src/net/connectionReader.cxx b/panda/src/net/connectionReader.cxx index 143de566f2..9b2cbabc95 100644 --- a/panda/src/net/connectionReader.cxx +++ b/panda/src/net/connectionReader.cxx @@ -386,6 +386,45 @@ get_tcp_header_size() const { return _tcp_header_size; } +//////////////////////////////////////////////////////////////////// +// Function: ConnectionReader::flush_read_connection +// Access: Protected, Virtual +// Description: Attempts to read all the possible data from the +// indicated connection, which has just delivered a +// write error (and has therefore already been closed). +// If the connection is not monitered by this reader, +// does nothing. +//////////////////////////////////////////////////////////////////// +void ConnectionReader:: +flush_read_connection(Connection *connection) { + // Ensure it doesn't get deleted. + SocketInfo sinfo(connection); + + if (!remove_connection(connection)) { + // Not already in the reader. + return; + } + + // The connection was previously in the reader, but has now been + // removed. Now we can flush it completely. We check if there is + // any read data available on just this one socket; we can do this + // right here in this thread, since we've already removed this + // connection from the reader. + + Socket_fdset fdset; + fdset.clear(); + fdset.setForSocket(*(sinfo.get_socket())); + int num_results = fdset.WaitForRead(true, 0); + while (num_results != 0) { + sinfo._busy = true; + if (!process_incoming_data(&sinfo)) { + break; + } + fdset.setForSocket(*(sinfo.get_socket())); + num_results = fdset.WaitForRead(true, 0); + } +} + //////////////////////////////////////////////////////////////////// // Function: ConnectionReader::shutdown // Access: Protected @@ -444,21 +483,23 @@ finish_socket(SocketInfo *sinfo) { // Access: Protected, Virtual // Description: This is run within a thread when the call to // select() indicates there is data available on a -// socket. +// socket. Returns true if the data is read +// successfully, false on failure (for instance, because +// the connection is closed). //////////////////////////////////////////////////////////////////// -void ConnectionReader:: +bool ConnectionReader:: process_incoming_data(SocketInfo *sinfo) { if (_raw_mode) { if (sinfo->is_udp()) { - process_raw_incoming_udp_data(sinfo); + return process_raw_incoming_udp_data(sinfo); } else { - process_raw_incoming_tcp_data(sinfo); + return process_raw_incoming_tcp_data(sinfo); } } else { if (sinfo->is_udp()) { - process_incoming_udp_data(sinfo); + return process_incoming_udp_data(sinfo); } else { - process_incoming_tcp_data(sinfo); + return process_incoming_tcp_data(sinfo); } } } @@ -468,10 +509,10 @@ process_incoming_data(SocketInfo *sinfo) { // Access: Protected // Description: //////////////////////////////////////////////////////////////////// -void ConnectionReader:: +bool ConnectionReader:: process_incoming_udp_data(SocketInfo *sinfo) { Socket_UDP *socket; - DCAST_INTO_V(socket, sinfo->get_socket()); + DCAST_INTO_R(socket, sinfo->get_socket(), false); Socket_Address addr; // Read as many bytes as we can. @@ -482,7 +523,7 @@ process_incoming_udp_data(SocketInfo *sinfo) { if (!okflag) { finish_socket(sinfo); - return; + return false; } else if (bytes_read == 0) { // The socket was closed (!). This shouldn't happen with a UDP @@ -491,7 +532,7 @@ process_incoming_udp_data(SocketInfo *sinfo) { _manager->connection_reset(sinfo->_connection, 0); } finish_socket(sinfo); - return; + return false; } // Since we are not running in raw mode, we decode the header to @@ -501,7 +542,7 @@ process_incoming_udp_data(SocketInfo *sinfo) { net_cat.error() << "Did not read entire header, discarding UDP datagram.\n"; finish_socket(sinfo); - return; + return true; } DatagramUDPHeader header(buffer); @@ -516,7 +557,7 @@ process_incoming_udp_data(SocketInfo *sinfo) { finish_socket(sinfo); if (_shutdown) { - return; + return false; } // And now do whatever we need to do to process the datagram. @@ -528,6 +569,8 @@ process_incoming_udp_data(SocketInfo *sinfo) { datagram.set_address(NetAddress(addr)); receive_datagram(datagram); } + + return true; } //////////////////////////////////////////////////////////////////// @@ -535,10 +578,10 @@ process_incoming_udp_data(SocketInfo *sinfo) { // Access: Protected // Description: //////////////////////////////////////////////////////////////////// -void ConnectionReader:: +bool ConnectionReader:: process_incoming_tcp_data(SocketInfo *sinfo) { Socket_TCP *socket; - DCAST_INTO_V(socket, sinfo->get_socket()); + DCAST_INTO_R(socket, sinfo->get_socket(), false); // Read only the header bytes to start with. char buffer[read_buffer_size]; @@ -563,7 +606,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { _manager->connection_reset(sinfo->_connection, 0); } finish_socket(sinfo); - return; + return false; } header_bytes_read += bytes_read; @@ -577,7 +620,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { net_cat.error() << "Did not read entire header, discarding TCP datagram.\n"; finish_socket(sinfo); - return; + return true; } DatagramTCPHeader header(buffer, _tcp_header_size); @@ -609,7 +652,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { _manager->connection_reset(sinfo->_connection, 0); } finish_socket(sinfo); - return; + return false; } int datagram_bytes = @@ -630,7 +673,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { finish_socket(sinfo); if (_shutdown) { - return; + return false; } // And now do whatever we need to do to process the datagram. @@ -642,6 +685,8 @@ process_incoming_tcp_data(SocketInfo *sinfo) { datagram.set_address(NetAddress(socket->GetPeerName())); receive_datagram(datagram); } + + return true; } //////////////////////////////////////////////////////////////////// @@ -649,10 +694,10 @@ process_incoming_tcp_data(SocketInfo *sinfo) { // Access: Protected // Description: //////////////////////////////////////////////////////////////////// -void ConnectionReader:: +bool ConnectionReader:: process_raw_incoming_udp_data(SocketInfo *sinfo) { Socket_UDP *socket; - DCAST_INTO_V(socket, sinfo->get_socket()); + DCAST_INTO_R(socket, sinfo->get_socket(), false); Socket_Address addr; // Read as many bytes as we can. @@ -663,7 +708,7 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) { if (!okflag) { finish_socket(sinfo); - return; + return false; } else if (bytes_read == 0) { // The socket was closed (!). This shouldn't happen with a UDP @@ -672,7 +717,7 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) { _manager->connection_reset(sinfo->_connection, 0); } finish_socket(sinfo); - return; + return false; } // In raw mode, we simply extract all the bytes and make that a @@ -684,12 +729,14 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) { finish_socket(sinfo); if (_shutdown) { - return; + return false; } datagram.set_connection(sinfo->_connection); datagram.set_address(NetAddress(addr)); receive_datagram(datagram); + + return true; } //////////////////////////////////////////////////////////////////// @@ -697,10 +744,10 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) { // Access: Protected // Description: //////////////////////////////////////////////////////////////////// -void ConnectionReader:: +bool ConnectionReader:: process_raw_incoming_tcp_data(SocketInfo *sinfo) { Socket_TCP *socket; - DCAST_INTO_V(socket, sinfo->get_socket()); + DCAST_INTO_R(socket, sinfo->get_socket(), false); // Read as many bytes as we can. char buffer[read_buffer_size]; @@ -718,7 +765,7 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) { _manager->connection_reset(sinfo->_connection, 0); } finish_socket(sinfo); - return; + return false; } // In raw mode, we simply extract all the bytes and make that a @@ -730,12 +777,14 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) { finish_socket(sinfo); if (_shutdown) { - return; + return false; } datagram.set_connection(sinfo->_connection); datagram.set_address(NetAddress(socket->GetPeerName())); receive_datagram(datagram); + + return true; } //////////////////////////////////////////////////////////////////// diff --git a/panda/src/net/connectionReader.h b/panda/src/net/connectionReader.h index b22978917a..327ccb2a82 100644 --- a/panda/src/net/connectionReader.h +++ b/panda/src/net/connectionReader.h @@ -87,6 +87,7 @@ PUBLISHED: int get_tcp_header_size() const; protected: + virtual void flush_read_connection(Connection *connection); virtual void receive_datagram(const NetDatagram &datagram)=0; class SocketInfo { @@ -105,11 +106,11 @@ protected: void clear_manager(); void finish_socket(SocketInfo *sinfo); - virtual void process_incoming_data(SocketInfo *sinfo); - virtual void process_incoming_udp_data(SocketInfo *sinfo); - virtual void process_incoming_tcp_data(SocketInfo *sinfo); - virtual void process_raw_incoming_udp_data(SocketInfo *sinfo); - virtual void process_raw_incoming_tcp_data(SocketInfo *sinfo); + virtual bool process_incoming_data(SocketInfo *sinfo); + virtual bool process_incoming_udp_data(SocketInfo *sinfo); + virtual bool process_incoming_tcp_data(SocketInfo *sinfo); + virtual bool process_raw_incoming_udp_data(SocketInfo *sinfo); + virtual bool process_raw_incoming_tcp_data(SocketInfo *sinfo); protected: ConnectionManager *_manager;