more aggressively close threads

This commit is contained in:
David Rose 2009-03-24 22:24:34 +00:00
parent 4b48520ada
commit 7d1937626a
7 changed files with 94 additions and 9 deletions

View File

@ -542,6 +542,7 @@ check_send_error(bool okflag) {
// Assume any error means the connection has been reset; tell
// our manager about it and ignore it.
if (_manager != (ConnectionManager *)NULL) {
_manager->flush_read_connection(this);
_manager->connection_reset(this, okflag);
}
return false;

View File

@ -301,6 +301,35 @@ new_connection(const PT(Connection) &connection) {
_connections.insert(connection);
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionManager::flush_read_connection
// Access: Protected, Virtual
// Description: An internal function called by ConnectionWriter only
// when a write failure has occurred. This method
// ensures that all of the read data has been flushed
// from the pipe before the connection is fully removed.
////////////////////////////////////////////////////////////////////
void ConnectionManager::
flush_read_connection(Connection *connection) {
{
LightMutexHolder holder(_set_mutex);
Connections::iterator ci = _connections.find(connection);
if (ci == _connections.end()) {
// Already closed, or not part of this ConnectionManager.
return;
}
_connections.erase(ci);
Readers::iterator ri;
for (ri = _readers.begin(); ri != _readers.end(); ++ri) {
(*ri)->flush_read_connection(connection);
}
}
Socket_IP *socket = connection->get_socket();
socket->Close();
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionManager::connection_reset
// Access: Protected, Virtual

View File

@ -62,6 +62,7 @@ PUBLISHED:
protected:
void new_connection(const PT(Connection) &connection);
virtual void flush_read_connection(Connection *connection);
virtual void connection_reset(const PT(Connection) &connection,
bool okflag);

View File

@ -386,6 +386,46 @@ get_tcp_header_size() const {
return _tcp_header_size;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::flush_read_connection
// Access: Protected, Virtual
// Description: Attempts to read all the possible data from the
// indicated connection, which has just delivered a
// write error (and has therefore already been closed).
// If the connection is not monitered by this reader,
// does nothing.
////////////////////////////////////////////////////////////////////
void ConnectionReader::
flush_read_connection(Connection *connection) {
// Ensure it doesn't get deleted.
SocketInfo sinfo(connection);
if (!remove_connection(connection)) {
// Not already in the reader.
return;
}
// The connection was previously in the reader, but has now been
// removed. Now we can flush it completely. We check if there is
// any read data available on just this one socket; we can do this
// right here in this thread, since we've already removed this
// connection from the reader.
Socket_fdset fdset;
fdset.clear();
fdset.setForSocket(*(sinfo.get_socket()));
int num_results = fdset.WaitForRead(true, 0);
cerr << "num_results = " << num_results << "\n";
while (num_results != 0) {
sinfo._busy = true;
process_incoming_data(&sinfo);
fdset.setForSocket(*(sinfo.get_socket()));
num_results = fdset.WaitForRead(true, 0);
cerr << "b num_results = " << num_results << "\n";
}
cerr << "done\n";
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::shutdown
// Access: Protected

View File

@ -87,6 +87,7 @@ PUBLISHED:
int get_tcp_header_size() const;
protected:
virtual void flush_read_connection(Connection *connection);
virtual void receive_datagram(const NetDatagram &datagram)=0;
class SocketInfo {

View File

@ -96,15 +96,7 @@ ConnectionWriter::
_manager->remove_writer(this);
}
// First, shutdown the queue. This will tell our threads they're
// done.
_queue.shutdown();
// Now wait for all threads to terminate.
Threads::iterator ti;
for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
(*ti)->join();
}
shutdown();
}
////////////////////////////////////////////////////////////////////
@ -341,6 +333,7 @@ get_tcp_header_size() const {
void ConnectionWriter::
clear_manager() {
_manager = (ConnectionManager *)NULL;
shutdown();
}
////////////////////////////////////////////////////////////////////
@ -362,3 +355,22 @@ thread_run(int thread_index) {
}
}
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::shutdown
// Access: Private
// Description: Stops all the threads and cleans them up.
////////////////////////////////////////////////////////////////////
void ConnectionWriter::
shutdown() {
// First, shutdown the queue. This will tell our threads they're
// done.
_queue.shutdown();
// Now wait for all threads to terminate.
Threads::iterator ti;
for (ti = _threads.begin(); ti != _threads.end(); ++ti) {
(*ti)->join();
}
_threads.clear();
}

View File

@ -71,6 +71,7 @@ protected:
private:
void thread_run(int thread_index);
bool send_datagram(const NetDatagram &datagram);
void shutdown();
protected:
ConnectionManager *_manager;