remove send_raw, replace with set_raw_mode

This commit is contained in:
David Rose 2001-07-23 22:59:28 +00:00
parent 9f8953c44b
commit 5663283955
7 changed files with 218 additions and 56 deletions

View File

@ -81,6 +81,7 @@ ConnectionReader::
ConnectionReader(ConnectionManager *manager, int num_threads) :
_manager(manager)
{
_raw_mode = false;
_polling = (num_threads <= 0);
_shutdown = false;
@ -321,6 +322,30 @@ get_num_threads() const {
return _threads.size();
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::set_raw_mode
// Access: Public
// Description: Sets the ConnectionReader into raw mode (or turns off
// raw mode). In raw mode, datagram headers are not
// expected; instead, all the data available on the pipe
// is treated as a single datagram.
////////////////////////////////////////////////////////////////////
void ConnectionReader::
set_raw_mode(bool mode) {
_raw_mode = mode;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::get_raw_mode
// Access: Public
// Description: Returns the current setting of the raw mode flag.
// See set_raw_mode().
////////////////////////////////////////////////////////////////////
bool ConnectionReader::
get_raw_mode() const {
return _raw_mode;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::shutdown
// Access: Protected
@ -412,10 +437,18 @@ finish_socket(SocketInfo *sinfo) {
////////////////////////////////////////////////////////////////////
void ConnectionReader::
process_incoming_data(SocketInfo *sinfo) {
if (sinfo->is_udp()) {
process_incoming_udp_data(sinfo);
if (_raw_mode) {
if (sinfo->is_udp()) {
process_raw_incoming_udp_data(sinfo);
} else {
process_raw_incoming_tcp_data(sinfo);
}
} else {
process_incoming_tcp_data(sinfo);
if (sinfo->is_udp()) {
process_incoming_udp_data(sinfo);
} else {
process_incoming_tcp_data(sinfo);
}
}
}
@ -454,31 +487,31 @@ process_incoming_udp_data(SocketInfo *sinfo) {
return;
}
// Now we must decode the header to determine how big the datagram
// is. This means we must have read at least a full header.
// Since we are not running in raw mode, we decode the header to
// determine how big the datagram is. This means we must have read
// at least a full header.
if (bytes_read < datagram_udp_header_size) {
net_cat.error()
<< "Did not read entire header, discarding UDP datagram.\n";
finish_socket(sinfo);
return;
}
DatagramUDPHeader header(buffer);
PRInt8 *dp = buffer + datagram_udp_header_size;
bytes_read -= datagram_udp_header_size;
NetDatagram datagram(dp, bytes_read);
if (_shutdown) {
finish_socket(sinfo);
return;
}
// Now that we've read all the data, it's time to finish the socket
// so another thread can read the next datagram.
finish_socket(sinfo);
if (_shutdown) {
return;
}
// And now do whatever we need to do to process the datagram.
if (!header.verify_datagram(datagram)) {
net_cat.error()
@ -614,15 +647,14 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
}
}
if (_shutdown) {
finish_socket(sinfo);
return;
}
// Now that we've read all the data, it's time to finish the socket
// so another thread can read the next datagram.
finish_socket(sinfo);
if (_shutdown) {
return;
}
// And now do whatever we need to do to process the datagram.
if (!header.verify_datagram(datagram)) {
net_cat.error()
@ -634,6 +666,113 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
}
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::process_raw_incoming_udp_data
// Access: Protected
// Description:
////////////////////////////////////////////////////////////////////
void ConnectionReader::
process_raw_incoming_udp_data(SocketInfo *sinfo) {
PRFileDesc *socket = sinfo->get_socket();
PRNetAddr addr;
// Read as many bytes as we can.
PRInt8 buffer[read_buffer_size];
PRInt32 bytes_read;
bytes_read = PR_RecvFrom(socket, buffer, read_buffer_size, 0,
&addr, PR_INTERVAL_NO_TIMEOUT);
if (bytes_read < 0) {
PRErrorCode errcode = PR_GetError();
if (errcode != PR_PENDING_INTERRUPT_ERROR) {
pprerror("PR_RecvFrom");
}
finish_socket(sinfo);
return;
} else if (bytes_read == 0) {
// The socket was closed (!). This shouldn't happen with a UDP
// connection. Oh well. Report that and return.
if (_manager != (ConnectionManager *)NULL) {
_manager->connection_reset(sinfo->_connection);
}
finish_socket(sinfo);
return;
}
// In raw mode, we simply extract all the bytes and make that a
// datagram.
NetDatagram datagram(buffer, bytes_read);
// Now that we've read all the data, it's time to finish the socket
// so another thread can read the next datagram.
finish_socket(sinfo);
if (_shutdown) {
return;
}
datagram.set_connection(sinfo->_connection);
datagram.set_address(NetAddress(addr));
receive_datagram(datagram);
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::process_raw_incoming_tcp_data
// Access: Protected
// Description:
////////////////////////////////////////////////////////////////////
void ConnectionReader::
process_raw_incoming_tcp_data(SocketInfo *sinfo) {
PRFileDesc *socket = sinfo->get_socket();
PRNetAddr addr;
// Read as many bytes as we can.
PRInt8 buffer[read_buffer_size];
PRInt32 bytes_read;
if (PR_GetSockName(socket, &addr) != PR_SUCCESS) {
pprerror("PR_GetSockName");
}
bytes_read = PR_Recv(socket, buffer, read_buffer_size, 0,
PR_INTERVAL_NO_TIMEOUT);
if (bytes_read < 0) {
PRErrorCode errcode = PR_GetError();
if (errcode != PR_PENDING_INTERRUPT_ERROR) {
pprerror("PR_RecvFrom");
}
finish_socket(sinfo);
return;
} else if (bytes_read == 0) {
// The socket was closed. Report that and return.
if (_manager != (ConnectionManager *)NULL) {
_manager->connection_reset(sinfo->_connection);
}
finish_socket(sinfo);
return;
}
// In raw mode, we simply extract all the bytes and make that a
// datagram.
NetDatagram datagram(buffer, bytes_read);
// Now that we've read all the data, it's time to finish the socket
// so another thread can read the next datagram.
finish_socket(sinfo);
if (_shutdown) {
return;
}
datagram.set_connection(sinfo->_connection);
datagram.set_address(NetAddress(addr));
receive_datagram(datagram);
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::thread_start

View File

@ -82,6 +82,9 @@ PUBLISHED:
bool is_polling() const;
int get_num_threads() const;
void set_raw_mode(bool mode);
bool get_raw_mode() const;
protected:
virtual void receive_datagram(const NetDatagram &datagram)=0;
@ -103,6 +106,8 @@ protected:
virtual void process_incoming_data(SocketInfo *sinfo);
virtual void process_incoming_udp_data(SocketInfo *sinfo);
virtual void process_incoming_tcp_data(SocketInfo *sinfo);
virtual void process_raw_incoming_udp_data(SocketInfo *sinfo);
virtual void process_raw_incoming_tcp_data(SocketInfo *sinfo);
private:
static void thread_start(void *data);
@ -117,6 +122,7 @@ protected:
ConnectionManager *_manager;
private:
bool _raw_mode;
bool _shutdown;
typedef pvector<PRThread *> Threads;

View File

@ -38,6 +38,7 @@ ConnectionWriter::
ConnectionWriter(ConnectionManager *manager, int num_threads) :
_manager(manager)
{
_raw_mode = false;
_immediate = (num_threads <= 0);
for (int i = 0; i < num_threads; i++) {
@ -118,7 +119,11 @@ send(const Datagram &datagram, const PT(Connection) &connection) {
copy.set_connection(connection);
if (_immediate) {
return connection->send_datagram(copy);
if (_raw_mode) {
return connection->send_raw_datagram(copy);
} else {
return connection->send_datagram(copy);
}
} else {
return _queue.insert(copy);
}
@ -166,43 +171,16 @@ send(const Datagram &datagram, const PT(Connection) &connection,
copy.set_address(address);
if (_immediate) {
return connection->send_datagram(copy);
if (_raw_mode) {
return connection->send_raw_datagram(copy);
} else {
return connection->send_datagram(copy);
}
} else {
return _queue.insert(copy);
}
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::send_raw
// Access: Public
// Description: Enqueues a datagram for transmission on the indicated
// socket, *without* sending the Datagram header. This
// will not be intelligible to a ConnectionReader on the
// other end, which will expect to receive a Datagram
// header. However, it may be necessary to send raw
// data to some other kind of client (like a proxy
// server).
//
// The data is always sent immediately, regardless of
// whether this is a queued connection or not.
////////////////////////////////////////////////////////////////////
bool ConnectionWriter::
send_raw(const Datagram &datagram, const PT(Connection) &connection) {
nassertr(connection != (Connection *)NULL, false);
nassertr(PR_GetDescType(connection->get_socket()) == PR_DESC_SOCKET_TCP, false);
if (net_cat.is_debug()) {
net_cat.debug()
<< "Sending TCP raw datagram of " << datagram.get_length()
<< " bytes\n";
}
NetDatagram copy(datagram);
copy.set_connection(connection);
return connection->send_raw_datagram(copy);
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::is_valid_for_udp
// Access: Public
@ -247,6 +225,36 @@ get_num_threads() const {
return _threads.size();
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::set_raw_mode
// Access: Public
// Description: Sets the ConnectionWriter into raw mode (or turns off
// raw mode). In raw mode, datagrams are not sent along
// with their headers; the bytes in the datagram are
// simply sent down the pipe.
//
// Setting the ConnectionWriter to raw mode must be done
// with care. This can only be done when the matching
// ConnectionReader is also set to raw mode, or when the
// ConnectionWriter is communicating to a process that
// does not expect datagrams.
////////////////////////////////////////////////////////////////////
void ConnectionWriter::
set_raw_mode(bool mode) {
_raw_mode = mode;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::get_raw_mode
// Access: Public
// Description: Returns the current setting of the raw mode flag.
// See set_raw_mode().
////////////////////////////////////////////////////////////////////
bool ConnectionWriter::
get_raw_mode() const {
return _raw_mode;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionWriter::clear_manager
// Access: Protected
@ -287,6 +295,10 @@ thread_run() {
NetDatagram datagram;
while (_queue.extract(datagram)) {
datagram.get_connection()->send_datagram(datagram);
if (_raw_mode) {
datagram.get_connection()->send_raw_datagram(datagram);
} else {
datagram.get_connection()->send_datagram(datagram);
}
}
}

View File

@ -54,14 +54,15 @@ PUBLISHED:
const PT(Connection) &connection,
const NetAddress &address);
bool send_raw(const Datagram &datagram, const PT(Connection) &connection);
bool is_valid_for_udp(const Datagram &datagram) const;
ConnectionManager *get_manager() const;
bool is_immediate() const;
int get_num_threads() const;
void set_raw_mode(bool mode);
bool get_raw_mode() const;
protected:
void clear_manager();
@ -74,6 +75,7 @@ protected:
ConnectionManager *_manager;
private:
bool _raw_mode;
DatagramQueue _queue;
typedef pvector<PRThread *> Threads;

View File

@ -85,6 +85,7 @@ main(int argc, char *argv[]) {
if (reader.get_data(datagram)) {
nout << "Got datagram " << datagram << "from "
<< datagram.get_address() << "\n";
datagram.dump_hex(nout);
}
}

View File

@ -90,6 +90,7 @@ main(int argc, char *argv[]) {
nout << "Got datagram " << datagram << "from "
<< datagram.get_address() << ", sending to "
<< clients.size() << " clients.\n";
datagram.dump_hex(nout);
Clients::iterator ci;
for (ci = clients.begin(); ci != clients.end(); ++ci) {

View File

@ -84,6 +84,7 @@ main(int argc, char *argv[]) {
if (reader.get_data(datagram)) {
nout << "Got datagram " << datagram << "from "
<< datagram.get_address() << "\n";
datagram.dump_hex(nout);
}
}