DatagramGenerator/SinkNet

This commit is contained in:
David Rose 2009-02-17 16:27:31 +00:00
parent 7f3c5d660f
commit 0d95f17e19
14 changed files with 419 additions and 37 deletions

View File

@ -28,7 +28,7 @@ class VirtualFile;
// from the net
////////////////////////////////////////////////////////////////////
class EXPCL_PANDAEXPRESS DatagramGenerator {
public:
PUBLISHED:
INLINE DatagramGenerator();
virtual ~DatagramGenerator();

View File

@ -26,7 +26,7 @@
// or across the net
////////////////////////////////////////////////////////////////////
class EXPCL_PANDAEXPRESS DatagramSink {
public:
PUBLISHED:
INLINE DatagramSink();
virtual ~DatagramSink();

View File

@ -13,11 +13,14 @@
#define SOURCES \
config_net.h connection.h connectionListener.h \
connectionManager.N connectionManager.h connectionReader.h \
connectionManager.N connectionManager.h \
connectionReader.I connectionReader.h \
connectionWriter.h datagramQueue.h \
datagramTCPHeader.I datagramTCPHeader.h \
datagramUDPHeader.I datagramUDPHeader.h \
netAddress.h netDatagram.I netDatagram.h \
datagramGeneratorNet.I datagramGeneratorNet.h \
datagramSinkNet.I datagramSinkNet.h \
queuedConnectionListener.I \
queuedConnectionListener.h queuedConnectionManager.h \
queuedConnectionReader.h recentConnectionReader.h \
@ -28,17 +31,22 @@
connectionManager.cxx connectionReader.cxx \
connectionWriter.cxx datagramQueue.cxx datagramTCPHeader.cxx \
datagramUDPHeader.cxx netAddress.cxx netDatagram.cxx \
datagramGeneratorNet.cxx \
datagramSinkNet.cxx \
queuedConnectionListener.cxx \
queuedConnectionManager.cxx queuedConnectionReader.cxx \
recentConnectionReader.cxx
#define INSTALL_HEADERS \
config_net.h connection.h connectionListener.h connectionManager.h \
connectionReader.h connectionWriter.h datagramQueue.h \
connectionReader.I connectionReader.h \
connectionWriter.h datagramQueue.h \
datagramTCPHeader.I datagramTCPHeader.h \
datagramUDPHeader.I datagramUDPHeader.h \
netAddress.h netDatagram.I \
netDatagram.h queuedConnectionListener.I \
datagramGeneratorNet.I datagramGeneratorNet.h \
datagramSinkNet.I datagramSinkNet.h \
queuedConnectionListener.h queuedConnectionManager.h \
queuedConnectionReader.h queuedReturn.I queuedReturn.h \
recentConnectionReader.h

View File

@ -0,0 +1,25 @@
// Filename: connectionReader.I
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::is_polling
// Access: Public
// Description: Returns true if the reader is a polling reader,
// i.e. it has no threads.
////////////////////////////////////////////////////////////////////
INLINE bool ConnectionReader::
is_polling() const {
return _polling;
}

View File

@ -21,6 +21,7 @@
#include "trueClock.h"
#include "socket_udp.h"
#include "socket_tcp.h"
#include "mutexHolder.h"
#include "lightMutexHolder.h"
#include "pnotify.h"
#include "atomicAdjust.h"
@ -322,17 +323,6 @@ get_manager() const {
return _manager;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::is_polling
// Access: Public
// Description: Returns true if the reader is a polling reader,
// i.e. it has no threads.
////////////////////////////////////////////////////////////////////
bool ConnectionReader::
is_polling() const {
return _polling;
}
////////////////////////////////////////////////////////////////////
// Function: ConnectionReader::get_num_threads
// Access: Public
@ -560,7 +550,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
socket->RecvData(buffer + header_bytes_read,
_tcp_header_size - header_bytes_read);
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
Thread::force_yield();
bytes_read = socket->RecvData(buffer + header_bytes_read,
_tcp_header_size - header_bytes_read);
@ -603,7 +593,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) {
socket->RecvData(buffer, min(read_buffer_size,
(int)(size - datagram.get_length())));
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
Thread::force_yield();
bytes_read =
socket->RecvData(buffer, min(read_buffer_size,
@ -716,7 +706,7 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) {
char buffer[read_buffer_size];
int bytes_read = socket->RecvData(buffer, read_buffer_size);
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) {
Thread::force_yield();
bytes_read = socket->RecvData(buffer, read_buffer_size);
}
@ -784,7 +774,7 @@ ConnectionReader::SocketInfo *ConnectionReader::
get_next_available_socket(bool allow_block, int current_thread_index) {
// Go to sleep on the select() mutex. This guarantees that only one
// thread is in this function at a time.
LightMutexHolder holder(_select_mutex);
MutexHolder holder(_select_mutex);
do {
// First, check the result from the previous select call. If
@ -827,6 +817,12 @@ get_next_available_socket(bool allow_block, int current_thread_index) {
if (!allow_block) {
timeout = 0;
}
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
// In the presence of SIMPLE_THREADS, we never wait at all,
// but rather we yield the thread if we come up empty (so that
// we won't block the entire process).
timeout = 0;
#endif
_num_results = _fdset.WaitForRead(false, timeout);
}
@ -836,6 +832,7 @@ get_next_available_socket(bool allow_block, int current_thread_index) {
// never timeout indefinitely, so we can check the shutdown
// flag every once in a while.)
interrupted = true;
Thread::force_yield();
} else if (_num_results < 0) {
// If we had an error, just return.

View File

@ -20,6 +20,7 @@
#include "connection.h"
#include "pointerTo.h"
#include "pmutex.h"
#include "lightMutex.h"
#include "pvector.h"
#include "pset.h"
@ -76,7 +77,7 @@ PUBLISHED:
void poll();
ConnectionManager *get_manager() const;
bool is_polling() const;
INLINE bool is_polling() const;
int get_num_threads() const;
void set_raw_mode(bool mode);
@ -98,6 +99,7 @@ protected:
bool _busy;
bool _error;
};
typedef pvector<SocketInfo *> Sockets;
void shutdown();
void clear_manager();
@ -109,6 +111,18 @@ protected:
virtual void process_raw_incoming_udp_data(SocketInfo *sinfo);
virtual void process_raw_incoming_tcp_data(SocketInfo *sinfo);
protected:
ConnectionManager *_manager;
// These structures track the total set of sockets (connections) we
// know about.
Sockets _sockets;
// This is the list of recently-removed sockets. We can't actually
// delete them until they're no longer _busy.
Sockets _removed_sockets;
// Any operations on _sockets are protected by this mutex.
LightMutex _sockets_mutex;
private:
void thread_run(int thread_index);
@ -117,9 +131,6 @@ private:
void rebuild_select_list();
protected:
ConnectionManager *_manager;
private:
bool _raw_mode;
int _tcp_header_size;
@ -141,31 +152,22 @@ private:
// These structures are used to manage selecting for noise on
// available sockets.
Socket_fdset _fdset;
typedef pvector<SocketInfo *> Sockets;
Sockets _selecting_sockets;
int _next_index;
int _num_results;
// Threads go to sleep on this mutex waiting for their chance to
// read a socket.
LightMutex _select_mutex;
Mutex _select_mutex;
// This is atomically updated with the index (in _threads) of the
// thread that is currently waiting on the PR_Poll() call. It
// contains -1 if no thread is so waiting.
AtomicAdjust::Integer _currently_polling_thread;
// These structures track the total set of sockets (connections) we
// know about.
Sockets _sockets;
// This is the list of recently-removed sockets. We can't actually
// delete them until they're no longer _busy.
Sockets _removed_sockets;
// Any operations on _sockets are protected by this mutex.
LightMutex _sockets_mutex;
friend class ConnectionManager;
friend class ReaderThread;
};
#include "connectionReader.I"
#endif

View File

@ -0,0 +1,14 @@
// Filename: datagramGeneratorNet.I
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,138 @@
// Filename: datagramGeneratorNet.cxx
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////
#include "pandabase.h"
#include "datagramGeneratorNet.h"
#include "mutexHolder.h"
#include "lightMutexHolder.h"
////////////////////////////////////////////////////////////////////
// Function: DatagramGeneratorNet::Constructor
// Access: Published
// Description: Creates a new DatagramGeneratorNet with the indicated
// number of threads to handle requests. Normally
// num_threads should be either 0 or 1 to guarantee that
// datagrams are generated in the same order in which
// they were received.
////////////////////////////////////////////////////////////////////
DatagramGeneratorNet::
DatagramGeneratorNet(ConnectionManager *manager, int num_threads) :
ConnectionReader(manager, num_threads),
_dg_received(_dg_lock),
_dg_processed(_dg_lock)
{
}
////////////////////////////////////////////////////////////////////
// Function: DatagramGeneratorNet::Destructor
// Access: Published, Virtual
// Description:
////////////////////////////////////////////////////////////////////
DatagramGeneratorNet::
~DatagramGeneratorNet() {
}
////////////////////////////////////////////////////////////////////
// Function: DatagramGeneratorNet::get_datagram
// Access: Published, Virtual
// Description: Reads the next datagram from the stream. Blocks
// until a datagram is available. Returns true on
// success, false on stream closed or error.
////////////////////////////////////////////////////////////////////
bool DatagramGeneratorNet::
get_datagram(Datagram &data) {
if (is_polling()) {
// Single-threaded case: we poll. No need to lock.
if (!thing_available()) {
poll();
}
while (!thing_available()) {
if (is_eof()) {
return false;
}
poll();
Thread::force_yield();
}
bool got_dg = get_thing(data);
nassertr(got_dg, false);
} else {
// Threaded case: no polling, we use mutexes and cvars to block
// instead.
MutexHolder holder(_dg_lock);
while (!thing_available()) {
if (is_eof()) {
return false;
}
_dg_received.wait();
}
bool got_dg = get_thing(data);
nassertr(got_dg, false);
_dg_processed.notify();
}
return true;
}
////////////////////////////////////////////////////////////////////
// Function: DatagramGeneratorNet::is_eof
// Access: Published, Virtual
// Description: Returns true if the stream has been closed normally.
// This test may only be made after a call to
// get_datagram() has failed.
////////////////////////////////////////////////////////////////////
bool DatagramGeneratorNet::
is_eof() {
// We're at eof if we have no more connected sockets.
LightMutexHolder holder(_sockets_mutex);
return _sockets.empty();
}
////////////////////////////////////////////////////////////////////
// Function: DatagramGeneratorNet::is_error
// Access: Published, Virtual
// Description: Returns true if the stream has an error condition.
////////////////////////////////////////////////////////////////////
bool DatagramGeneratorNet::
is_error() {
// There's an error if any one of our connected sockets reports an error.
LightMutexHolder holder(_sockets_mutex);
Sockets::const_iterator si;
for (si = _sockets.begin(); si != _sockets.end(); ++si) {
SocketInfo *sinfo = (*si);
if (sinfo->_error) {
return true;
}
}
return false;
}
////////////////////////////////////////////////////////////////////
// Function: DatagramGeneratorNet::receive_datagram
// Access: Protected, Virtual
// Description: An internal function called by ConnectionReader()
// when a new datagram has become available. This call
// may be received in a sub-thread.
////////////////////////////////////////////////////////////////////
void DatagramGeneratorNet::
receive_datagram(const NetDatagram &datagram) {
MutexHolder holder(_dg_lock);
while (!enqueue_thing(datagram)) {
_dg_processed.wait();
}
_dg_received.notify();
}

View File

@ -0,0 +1,57 @@
// Filename: datagramGeneratorNet.h
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////
#ifndef DATAGRAMGENERATORNET_H
#define DATAGRAMGENERATORNET_H
#include "pandabase.h"
#include "datagramGenerator.h"
#include "connectionReader.h"
#include "queuedReturn.h"
#include "pmutex.h"
#include "conditionVar.h"
EXPORT_TEMPLATE_CLASS(EXPCL_PANDA_NET, EXPTP_PANDA_NET, QueuedReturn<Datagram>);
////////////////////////////////////////////////////////////////////
// Class : DatagramGeneratorNet
// Description : This class provides datagrams one-at-a-time as read
// directly from the net, via a TCP connection. If a
// datagram is not available, get_datagram() will block
// until one is.
////////////////////////////////////////////////////////////////////
class EXPCL_PANDAEXPRESS DatagramGeneratorNet : public DatagramGenerator, public ConnectionReader, public QueuedReturn<Datagram> {
PUBLISHED:
DatagramGeneratorNet(ConnectionManager *manager, int num_threads);
virtual ~DatagramGeneratorNet();
// Inherited from DatagramGenerator
virtual bool get_datagram(Datagram &data);
virtual bool is_eof();
virtual bool is_error();
protected:
// Inherited from ConnectionReader
virtual void receive_datagram(const NetDatagram &datagram);
Mutex _dg_lock;
ConditionVar _dg_received; // notified when a new datagram is received.
ConditionVar _dg_processed; // notified when a new datagram is processed.
};
#include "datagramGeneratorNet.I"
#endif

View File

@ -0,0 +1,36 @@
// Filename: datagramSinkNet.I
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
// Function: DatagramSinkNet::set_target
// Access: Published
// Description: Specifies the Connection that will receive all future
// Datagrams sent.
////////////////////////////////////////////////////////////////////
INLINE void DatagramSinkNet::
set_target(Connection *connection) {
_target = connection;
}
////////////////////////////////////////////////////////////////////
// Function: DatagramSinkNet::get_target
// Access: Published
// Description: Returns the current target Connection, or NULL if the
// target has not yet been set. See set_target().
////////////////////////////////////////////////////////////////////
INLINE Connection *DatagramSinkNet::
get_target() const {
return _target;
}

View File

@ -0,0 +1,58 @@
// Filename: datagramSinkNet.cxx
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////
#include "pandabase.h"
#include "datagramSinkNet.h"
////////////////////////////////////////////////////////////////////
// Function: DatagramSinkNet::Constructor
// Access: Published
// Description: Creates a new DatagramSinkNet with the indicated
// number of threads to handle writing. Normally
// num_threads should be either 0 or 1 to guarantee that
// datagrams are delivered in the same order in which
// they were sent.
////////////////////////////////////////////////////////////////////
DatagramSinkNet::
DatagramSinkNet(ConnectionManager *manager, int num_threads) :
ConnectionWriter(manager, num_threads)
{
}
////////////////////////////////////////////////////////////////////
// Function: DatagramSinkNet::put_datagram
// Access: Published, Virtual
// Description: Sends the given datagram to the target. Returns true
// on success, false if there is an error. Blocks if
// necessary until the target is ready.
////////////////////////////////////////////////////////////////////
bool DatagramSinkNet::
put_datagram(const Datagram &data) {
if (_target == (Connection *)NULL) {
return false;
}
return send(data, _target, true);
}
////////////////////////////////////////////////////////////////////
// Function: DatagramSinkNet::is_error
// Access: Published, Virtual
// Description: Returns true if there is an error on the target
// connection, or if the target has never been set.
////////////////////////////////////////////////////////////////////
bool DatagramSinkNet::
is_error() {
return (_target == (Connection *)NULL || _target->get_socket() == (Socket_IP *)NULL);
}

View File

@ -0,0 +1,45 @@
// Filename: datagramSinkNet.h
// Created by: drose (15Feb09)
//
////////////////////////////////////////////////////////////////////
//
// PANDA 3D SOFTWARE
// Copyright (c) Carnegie Mellon University. All rights reserved.
//
// All use of this software is subject to the terms of the revised BSD
// license. You should have received a copy of this license along
// with this source code in a file named "LICENSE."
//
////////////////////////////////////////////////////////////////////
#ifndef DATAGRAMSINKNET_H
#define DATAGRAMSINKNET_H
#include "pandabase.h"
#include "datagramSink.h"
#include "connectionWriter.h"
////////////////////////////////////////////////////////////////////
// Class : DatagramSinkNet
// Description : This class accepts datagrams one-at-a-time and sends
// them over the net, via a TCP connection.
////////////////////////////////////////////////////////////////////
class EXPCL_PANDA_PUTIL DatagramSinkNet : public DatagramSink, public ConnectionWriter {
PUBLISHED:
DatagramSinkNet(ConnectionManager *manager, int num_threads);
INLINE void set_target(Connection *connection);
INLINE Connection *get_target() const;
// Inherited from DatagramSink
virtual bool put_datagram(const Datagram &data);
virtual bool is_error();
private:
PT(Connection) _target;
};
#include "datagramSinkNet.I"
#endif

View File

@ -5,6 +5,6 @@
#include "connectionManager.cxx"
#include "connectionReader.cxx"
#include "connectionWriter.cxx"
#include "datagramGeneratorNet.cxx"
#include "datagramSinkNet.cxx"
#include "datagramQueue.cxx"
#include "datagramTCPHeader.cxx"
#include "datagramUDPHeader.cxx"

View File

@ -1,4 +1,6 @@
#include "datagramTCPHeader.cxx"
#include "datagramUDPHeader.cxx"
#include "netAddress.cxx"
#include "netDatagram.cxx"
#include "queuedConnectionListener.cxx"