fix lost message(s) at connection lost

This commit is contained in:
David Rose 2009-03-26 18:33:34 +00:00
parent b83412c44e
commit 97949f7d0d
7 changed files with 137 additions and 50 deletions

View File

@ -542,6 +542,7 @@ check_send_error(bool okflag) {
// Assume any error means the connection has been reset; tell // Assume any error means the connection has been reset; tell
// our manager about it and ignore it. // our manager about it and ignore it.
if (_manager != (ConnectionManager *)NULL) { if (_manager != (ConnectionManager *)NULL) {
_manager->flush_read_connection(this);
_manager->connection_reset(this, okflag); _manager->connection_reset(this, okflag);
} }
return false; return false;

View File

@ -50,10 +50,10 @@ receive_datagram(const NetDatagram &) {
// detected on a rendezvous port. In this case, it // detected on a rendezvous port. In this case, it
// performs the accept(). // performs the accept().
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ConnectionListener:: bool ConnectionListener::
process_incoming_data(SocketInfo *sinfo) { process_incoming_data(SocketInfo *sinfo) {
Socket_TCP_Listen *socket; Socket_TCP_Listen *socket;
DCAST_INTO_V(socket, sinfo->get_socket()); DCAST_INTO_R(socket, sinfo->get_socket(), false);
Socket_Address addr; Socket_Address addr;
Socket_TCP *session = new Socket_TCP; Socket_TCP *session = new Socket_TCP;
@ -70,8 +70,10 @@ process_incoming_data(SocketInfo *sinfo) {
net_cat.error() net_cat.error()
<< "Error when accepting new connection.\n"; << "Error when accepting new connection.\n";
delete session; delete session;
finish_socket(sinfo);
return false;
}
} else {
NetAddress net_addr(addr); NetAddress net_addr(addr);
net_cat.info() net_cat.info()
<< "Received TCP connection from client " << net_addr.get_ip_string() << "Received TCP connection from client " << net_addr.get_ip_string()
@ -83,7 +85,7 @@ process_incoming_data(SocketInfo *sinfo) {
_manager->new_connection(new_connection); _manager->new_connection(new_connection);
} }
connection_opened(sinfo->_connection, net_addr, new_connection); connection_opened(sinfo->_connection, net_addr, new_connection);
}
finish_socket(sinfo); finish_socket(sinfo);
return true;
} }

View File

@ -42,7 +42,7 @@ protected:
const NetAddress &address, const NetAddress &address,
const PT(Connection) &new_connection)=0; const PT(Connection) &new_connection)=0;
virtual void process_incoming_data(SocketInfo *sinfo); virtual bool process_incoming_data(SocketInfo *sinfo);
private: private:
}; };

View File

@ -31,7 +31,8 @@
// Description: // Description:
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
ConnectionManager:: ConnectionManager::
ConnectionManager() { ConnectionManager() : _set_mutex("ConnectionManager::_set_mutex")
{
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -301,6 +302,38 @@ new_connection(const PT(Connection) &connection) {
_connections.insert(connection); _connections.insert(connection);
} }
////////////////////////////////////////////////////////////////////
// Function: ConnectionManager::flush_read_connection
// Access: Protected, Virtual
// Description: An internal function called by ConnectionWriter only
// when a write failure has occurred. This method
// ensures that all of the read data has been flushed
// from the pipe before the connection is fully removed.
////////////////////////////////////////////////////////////////////
void ConnectionManager::
flush_read_connection(Connection *connection) {
Readers readers;
{
LightMutexHolder holder(_set_mutex);
Connections::iterator ci = _connections.find(connection);
if (ci == _connections.end()) {
// Already closed, or not part of this ConnectionManager.
return;
}
_connections.erase(ci);
// Get a copy first, so we can release the lock before traversing.
readers = _readers;
}
Readers::iterator ri;
for (ri = readers.begin(); ri != readers.end(); ++ri) {
(*ri)->flush_read_connection(connection);
}
Socket_IP *socket = connection->get_socket();
socket->Close();
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: ConnectionManager::connection_reset // Function: ConnectionManager::connection_reset
// Access: Protected, Virtual // Access: Protected, Virtual
@ -328,7 +361,7 @@ connection_reset(const PT(Connection) &connection, bool okflag) {
// Turns out we do need to explicitly mark the connection as closed // Turns out we do need to explicitly mark the connection as closed
// immediately, rather than waiting for the user to do it, since // immediately, rather than waiting for the user to do it, since
// otherwise we'll keep trying to listen for noise on the socket and // otherwise we'll keep trying to listen for noise on the socket and
// we'll always here a "yes" answer. // we'll always hear a "yes" answer.
close_connection(connection); close_connection(connection);
} }

View File

@ -62,6 +62,7 @@ PUBLISHED:
protected: protected:
void new_connection(const PT(Connection) &connection); void new_connection(const PT(Connection) &connection);
virtual void flush_read_connection(Connection *connection);
virtual void connection_reset(const PT(Connection) &connection, virtual void connection_reset(const PT(Connection) &connection,
bool okflag); bool okflag);

View File

@ -386,6 +386,45 @@ get_tcp_header_size() const {
return _tcp_header_size; return _tcp_header_size;
} }
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::flush_read_connection
// Access: Protected, Virtual
// Description: Attempts to read all the possible data from the
// indicated connection, which has just delivered a
// write error (and has therefore already been closed).
// If the connection is not monitered by this reader,
// does nothing.
////////////////////////////////////////////////////////////////////
void ConnectionReader::
flush_read_connection(Connection *connection) {
// Ensure it doesn't get deleted.
SocketInfo sinfo(connection);
if (!remove_connection(connection)) {
// Not already in the reader.
return;
}
// The connection was previously in the reader, but has now been
// removed. Now we can flush it completely. We check if there is
// any read data available on just this one socket; we can do this
// right here in this thread, since we've already removed this
// connection from the reader.
Socket_fdset fdset;
fdset.clear();
fdset.setForSocket(*(sinfo.get_socket()));
int num_results = fdset.WaitForRead(true, 0);
while (num_results != 0) {
sinfo._busy = true;
if (!process_incoming_data(&sinfo)) {
break;
}
fdset.setForSocket(*(sinfo.get_socket()));
num_results = fdset.WaitForRead(true, 0);
}
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::shutdown // Function: ConnectionReader::shutdown
// Access: Protected // Access: Protected
@ -444,21 +483,23 @@ finish_socket(SocketInfo *sinfo) {
// Access: Protected, Virtual // Access: Protected, Virtual
// Description: This is run within a thread when the call to // Description: This is run within a thread when the call to
// select() indicates there is data available on a // select() indicates there is data available on a
// socket. // socket. Returns true if the data is read
// successfully, false on failure (for instance, because
// the connection is closed).
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ConnectionReader:: bool ConnectionReader::
process_incoming_data(SocketInfo *sinfo) { process_incoming_data(SocketInfo *sinfo) {
if (_raw_mode) { if (_raw_mode) {
if (sinfo->is_udp()) { if (sinfo->is_udp()) {
process_raw_incoming_udp_data(sinfo); return process_raw_incoming_udp_data(sinfo);
} else { } else {
process_raw_incoming_tcp_data(sinfo); return process_raw_incoming_tcp_data(sinfo);
} }
} else { } else {
if (sinfo->is_udp()) { if (sinfo->is_udp()) {
process_incoming_udp_data(sinfo); return process_incoming_udp_data(sinfo);
} else { } else {
process_incoming_tcp_data(sinfo); return process_incoming_tcp_data(sinfo);
} }
} }
} }
@ -468,10 +509,10 @@ process_incoming_data(SocketInfo *sinfo) {
// Access: Protected // Access: Protected
// Description: // Description:
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ConnectionReader:: bool ConnectionReader::
process_incoming_udp_data(SocketInfo *sinfo) { process_incoming_udp_data(SocketInfo *sinfo) {
Socket_UDP *socket; Socket_UDP *socket;
DCAST_INTO_V(socket, sinfo->get_socket()); DCAST_INTO_R(socket, sinfo->get_socket(), false);
Socket_Address addr; Socket_Address addr;
// Read as many bytes as we can. // Read as many bytes as we can.
@ -482,7 +523,7 @@ process_incoming_udp_data(SocketInfo *sinfo) {
if (!okflag) { if (!okflag) {
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} else if (bytes_read == 0) { } else if (bytes_read == 0) {
// The socket was closed (!). This shouldn't happen with a UDP // The socket was closed (!). This shouldn't happen with a UDP
@ -491,7 +532,7 @@ process_incoming_udp_data(SocketInfo *sinfo) {
_manager->connection_reset(sinfo->_connection, 0); _manager->connection_reset(sinfo->_connection, 0);
} }
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} }
// Since we are not running in raw mode, we decode the header to // Since we are not running in raw mode, we decode the header to
@ -501,7 +542,7 @@ process_incoming_udp_data(SocketInfo *sinfo) {
net_cat.error() net_cat.error()
<< "Did not read entire header, discarding UDP datagram.\n"; << "Did not read entire header, discarding UDP datagram.\n";
finish_socket(sinfo); finish_socket(sinfo);
return; return true;
} }
DatagramUDPHeader header(buffer); DatagramUDPHeader header(buffer);
@ -516,7 +557,7 @@ process_incoming_udp_data(SocketInfo *sinfo) {
finish_socket(sinfo); finish_socket(sinfo);
if (_shutdown) { if (_shutdown) {
return; return false;
} }
// And now do whatever we need to do to process the datagram. // And now do whatever we need to do to process the datagram.
@ -528,6 +569,8 @@ process_incoming_udp_data(SocketInfo *sinfo) {
datagram.set_address(NetAddress(addr)); datagram.set_address(NetAddress(addr));
receive_datagram(datagram); receive_datagram(datagram);
} }
return true;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -535,10 +578,10 @@ process_incoming_udp_data(SocketInfo *sinfo) {
// Access: Protected // Access: Protected
// Description: // Description:
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ConnectionReader:: bool ConnectionReader::
process_incoming_tcp_data(SocketInfo *sinfo) { process_incoming_tcp_data(SocketInfo *sinfo) {
Socket_TCP *socket; Socket_TCP *socket;
DCAST_INTO_V(socket, sinfo->get_socket()); DCAST_INTO_R(socket, sinfo->get_socket(), false);
// Read only the header bytes to start with. // Read only the header bytes to start with.
char buffer[read_buffer_size]; char buffer[read_buffer_size];
@ -563,7 +606,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
_manager->connection_reset(sinfo->_connection, 0); _manager->connection_reset(sinfo->_connection, 0);
} }
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} }
header_bytes_read += bytes_read; header_bytes_read += bytes_read;
@ -577,7 +620,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
net_cat.error() net_cat.error()
<< "Did not read entire header, discarding TCP datagram.\n"; << "Did not read entire header, discarding TCP datagram.\n";
finish_socket(sinfo); finish_socket(sinfo);
return; return true;
} }
DatagramTCPHeader header(buffer, _tcp_header_size); DatagramTCPHeader header(buffer, _tcp_header_size);
@ -609,7 +652,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
_manager->connection_reset(sinfo->_connection, 0); _manager->connection_reset(sinfo->_connection, 0);
} }
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} }
int datagram_bytes = int datagram_bytes =
@ -630,7 +673,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
finish_socket(sinfo); finish_socket(sinfo);
if (_shutdown) { if (_shutdown) {
return; return false;
} }
// And now do whatever we need to do to process the datagram. // And now do whatever we need to do to process the datagram.
@ -642,6 +685,8 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
datagram.set_address(NetAddress(socket->GetPeerName())); datagram.set_address(NetAddress(socket->GetPeerName()));
receive_datagram(datagram); receive_datagram(datagram);
} }
return true;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -649,10 +694,10 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
// Access: Protected // Access: Protected
// Description: // Description:
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ConnectionReader:: bool ConnectionReader::
process_raw_incoming_udp_data(SocketInfo *sinfo) { process_raw_incoming_udp_data(SocketInfo *sinfo) {
Socket_UDP *socket; Socket_UDP *socket;
DCAST_INTO_V(socket, sinfo->get_socket()); DCAST_INTO_R(socket, sinfo->get_socket(), false);
Socket_Address addr; Socket_Address addr;
// Read as many bytes as we can. // Read as many bytes as we can.
@ -663,7 +708,7 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) {
if (!okflag) { if (!okflag) {
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} else if (bytes_read == 0) { } else if (bytes_read == 0) {
// The socket was closed (!). This shouldn't happen with a UDP // The socket was closed (!). This shouldn't happen with a UDP
@ -672,7 +717,7 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) {
_manager->connection_reset(sinfo->_connection, 0); _manager->connection_reset(sinfo->_connection, 0);
} }
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} }
// In raw mode, we simply extract all the bytes and make that a // In raw mode, we simply extract all the bytes and make that a
@ -684,12 +729,14 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) {
finish_socket(sinfo); finish_socket(sinfo);
if (_shutdown) { if (_shutdown) {
return; return false;
} }
datagram.set_connection(sinfo->_connection); datagram.set_connection(sinfo->_connection);
datagram.set_address(NetAddress(addr)); datagram.set_address(NetAddress(addr));
receive_datagram(datagram); receive_datagram(datagram);
return true;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -697,10 +744,10 @@ process_raw_incoming_udp_data(SocketInfo *sinfo) {
// Access: Protected // Access: Protected
// Description: // Description:
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ConnectionReader:: bool ConnectionReader::
process_raw_incoming_tcp_data(SocketInfo *sinfo) { process_raw_incoming_tcp_data(SocketInfo *sinfo) {
Socket_TCP *socket; Socket_TCP *socket;
DCAST_INTO_V(socket, sinfo->get_socket()); DCAST_INTO_R(socket, sinfo->get_socket(), false);
// Read as many bytes as we can. // Read as many bytes as we can.
char buffer[read_buffer_size]; char buffer[read_buffer_size];
@ -718,7 +765,7 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) {
_manager->connection_reset(sinfo->_connection, 0); _manager->connection_reset(sinfo->_connection, 0);
} }
finish_socket(sinfo); finish_socket(sinfo);
return; return false;
} }
// In raw mode, we simply extract all the bytes and make that a // In raw mode, we simply extract all the bytes and make that a
@ -730,12 +777,14 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) {
finish_socket(sinfo); finish_socket(sinfo);
if (_shutdown) { if (_shutdown) {
return; return false;
} }
datagram.set_connection(sinfo->_connection); datagram.set_connection(sinfo->_connection);
datagram.set_address(NetAddress(socket->GetPeerName())); datagram.set_address(NetAddress(socket->GetPeerName()));
receive_datagram(datagram); receive_datagram(datagram);
return true;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////

View File

@ -87,6 +87,7 @@ PUBLISHED:
int get_tcp_header_size() const; int get_tcp_header_size() const;
protected: protected:
virtual void flush_read_connection(Connection *connection);
virtual void receive_datagram(const NetDatagram &datagram)=0; virtual void receive_datagram(const NetDatagram &datagram)=0;
class SocketInfo { class SocketInfo {
@ -105,11 +106,11 @@ protected:
void clear_manager(); void clear_manager();
void finish_socket(SocketInfo *sinfo); void finish_socket(SocketInfo *sinfo);
virtual void process_incoming_data(SocketInfo *sinfo); virtual bool process_incoming_data(SocketInfo *sinfo);
virtual void process_incoming_udp_data(SocketInfo *sinfo); virtual bool process_incoming_udp_data(SocketInfo *sinfo);
virtual void process_incoming_tcp_data(SocketInfo *sinfo); virtual bool process_incoming_tcp_data(SocketInfo *sinfo);
virtual void process_raw_incoming_udp_data(SocketInfo *sinfo); virtual bool process_raw_incoming_udp_data(SocketInfo *sinfo);
virtual void process_raw_incoming_tcp_data(SocketInfo *sinfo); virtual bool process_raw_incoming_tcp_data(SocketInfo *sinfo);
protected: protected:
ConnectionManager *_manager; ConnectionManager *_manager;