diff --git a/panda/src/express/datagramGenerator.h b/panda/src/express/datagramGenerator.h index 4b3694fc68..a1fda45378 100644 --- a/panda/src/express/datagramGenerator.h +++ b/panda/src/express/datagramGenerator.h @@ -28,7 +28,7 @@ class VirtualFile; // from the net //////////////////////////////////////////////////////////////////// class EXPCL_PANDAEXPRESS DatagramGenerator { -public: +PUBLISHED: INLINE DatagramGenerator(); virtual ~DatagramGenerator(); diff --git a/panda/src/express/datagramSink.h b/panda/src/express/datagramSink.h index 51917b8071..40bb3dfe2e 100644 --- a/panda/src/express/datagramSink.h +++ b/panda/src/express/datagramSink.h @@ -26,7 +26,7 @@ // or across the net //////////////////////////////////////////////////////////////////// class EXPCL_PANDAEXPRESS DatagramSink { -public: +PUBLISHED: INLINE DatagramSink(); virtual ~DatagramSink(); diff --git a/panda/src/net/Sources.pp b/panda/src/net/Sources.pp index e11f9a8405..f2f3071411 100644 --- a/panda/src/net/Sources.pp +++ b/panda/src/net/Sources.pp @@ -13,11 +13,14 @@ #define SOURCES \ config_net.h connection.h connectionListener.h \ - connectionManager.N connectionManager.h connectionReader.h \ + connectionManager.N connectionManager.h \ + connectionReader.I connectionReader.h \ connectionWriter.h datagramQueue.h \ datagramTCPHeader.I datagramTCPHeader.h \ datagramUDPHeader.I datagramUDPHeader.h \ netAddress.h netDatagram.I netDatagram.h \ + datagramGeneratorNet.I datagramGeneratorNet.h \ + datagramSinkNet.I datagramSinkNet.h \ queuedConnectionListener.I \ queuedConnectionListener.h queuedConnectionManager.h \ queuedConnectionReader.h recentConnectionReader.h \ @@ -28,17 +31,22 @@ connectionManager.cxx connectionReader.cxx \ connectionWriter.cxx datagramQueue.cxx datagramTCPHeader.cxx \ datagramUDPHeader.cxx netAddress.cxx netDatagram.cxx \ + datagramGeneratorNet.cxx \ + datagramSinkNet.cxx \ queuedConnectionListener.cxx \ queuedConnectionManager.cxx queuedConnectionReader.cxx \ recentConnectionReader.cxx #define INSTALL_HEADERS \ config_net.h connection.h connectionListener.h connectionManager.h \ - connectionReader.h connectionWriter.h datagramQueue.h \ + connectionReader.I connectionReader.h \ + connectionWriter.h datagramQueue.h \ datagramTCPHeader.I datagramTCPHeader.h \ datagramUDPHeader.I datagramUDPHeader.h \ netAddress.h netDatagram.I \ netDatagram.h queuedConnectionListener.I \ + datagramGeneratorNet.I datagramGeneratorNet.h \ + datagramSinkNet.I datagramSinkNet.h \ queuedConnectionListener.h queuedConnectionManager.h \ queuedConnectionReader.h queuedReturn.I queuedReturn.h \ recentConnectionReader.h diff --git a/panda/src/net/connectionReader.I b/panda/src/net/connectionReader.I new file mode 100644 index 0000000000..ed663c559e --- /dev/null +++ b/panda/src/net/connectionReader.I @@ -0,0 +1,25 @@ +// Filename: connectionReader.I +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + + +//////////////////////////////////////////////////////////////////// +// Function: ConnectionReader::is_polling +// Access: Public +// Description: Returns true if the reader is a polling reader, +// i.e. it has no threads. +//////////////////////////////////////////////////////////////////// +INLINE bool ConnectionReader:: +is_polling() const { + return _polling; +} diff --git a/panda/src/net/connectionReader.cxx b/panda/src/net/connectionReader.cxx index 4766d22525..143de566f2 100644 --- a/panda/src/net/connectionReader.cxx +++ b/panda/src/net/connectionReader.cxx @@ -21,6 +21,7 @@ #include "trueClock.h" #include "socket_udp.h" #include "socket_tcp.h" +#include "mutexHolder.h" #include "lightMutexHolder.h" #include "pnotify.h" #include "atomicAdjust.h" @@ -322,17 +323,6 @@ get_manager() const { return _manager; } -//////////////////////////////////////////////////////////////////// -// Function: ConnectionReader::is_polling -// Access: Public -// Description: Returns true if the reader is a polling reader, -// i.e. it has no threads. -//////////////////////////////////////////////////////////////////// -bool ConnectionReader:: -is_polling() const { - return _polling; -} - //////////////////////////////////////////////////////////////////// // Function: ConnectionReader::get_num_threads // Access: Public @@ -560,7 +550,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { socket->RecvData(buffer + header_bytes_read, _tcp_header_size - header_bytes_read); #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) - while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) { + while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) { Thread::force_yield(); bytes_read = socket->RecvData(buffer + header_bytes_read, _tcp_header_size - header_bytes_read); @@ -603,7 +593,7 @@ process_incoming_tcp_data(SocketInfo *sinfo) { socket->RecvData(buffer, min(read_buffer_size, (int)(size - datagram.get_length()))); #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) - while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) { + while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) { Thread::force_yield(); bytes_read = socket->RecvData(buffer, min(read_buffer_size, @@ -716,7 +706,7 @@ process_raw_incoming_tcp_data(SocketInfo *sinfo) { char buffer[read_buffer_size]; int bytes_read = socket->RecvData(buffer, read_buffer_size); #if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) - while (bytes_read <= 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) { + while (bytes_read < 0 && socket->GetLastError() == LOCAL_BLOCKING_ERROR) { Thread::force_yield(); bytes_read = socket->RecvData(buffer, read_buffer_size); } @@ -784,7 +774,7 @@ ConnectionReader::SocketInfo *ConnectionReader:: get_next_available_socket(bool allow_block, int current_thread_index) { // Go to sleep on the select() mutex. This guarantees that only one // thread is in this function at a time. - LightMutexHolder holder(_select_mutex); + MutexHolder holder(_select_mutex); do { // First, check the result from the previous select call. If @@ -827,6 +817,12 @@ get_next_available_socket(bool allow_block, int current_thread_index) { if (!allow_block) { timeout = 0; } +#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) + // In the presence of SIMPLE_THREADS, we never wait at all, + // but rather we yield the thread if we come up empty (so that + // we won't block the entire process). + timeout = 0; +#endif _num_results = _fdset.WaitForRead(false, timeout); } @@ -836,6 +832,7 @@ get_next_available_socket(bool allow_block, int current_thread_index) { // never timeout indefinitely, so we can check the shutdown // flag every once in a while.) interrupted = true; + Thread::force_yield(); } else if (_num_results < 0) { // If we had an error, just return. diff --git a/panda/src/net/connectionReader.h b/panda/src/net/connectionReader.h index d3730c3337..b22978917a 100644 --- a/panda/src/net/connectionReader.h +++ b/panda/src/net/connectionReader.h @@ -20,6 +20,7 @@ #include "connection.h" #include "pointerTo.h" +#include "pmutex.h" #include "lightMutex.h" #include "pvector.h" #include "pset.h" @@ -76,7 +77,7 @@ PUBLISHED: void poll(); ConnectionManager *get_manager() const; - bool is_polling() const; + INLINE bool is_polling() const; int get_num_threads() const; void set_raw_mode(bool mode); @@ -98,6 +99,7 @@ protected: bool _busy; bool _error; }; + typedef pvector Sockets; void shutdown(); void clear_manager(); @@ -109,6 +111,18 @@ protected: virtual void process_raw_incoming_udp_data(SocketInfo *sinfo); virtual void process_raw_incoming_tcp_data(SocketInfo *sinfo); +protected: + ConnectionManager *_manager; + + // These structures track the total set of sockets (connections) we + // know about. + Sockets _sockets; + // This is the list of recently-removed sockets. We can't actually + // delete them until they're no longer _busy. + Sockets _removed_sockets; + // Any operations on _sockets are protected by this mutex. + LightMutex _sockets_mutex; + private: void thread_run(int thread_index); @@ -117,9 +131,6 @@ private: void rebuild_select_list(); -protected: - ConnectionManager *_manager; - private: bool _raw_mode; int _tcp_header_size; @@ -141,31 +152,22 @@ private: // These structures are used to manage selecting for noise on // available sockets. Socket_fdset _fdset; - typedef pvector Sockets; Sockets _selecting_sockets; int _next_index; int _num_results; // Threads go to sleep on this mutex waiting for their chance to // read a socket. - LightMutex _select_mutex; + Mutex _select_mutex; // This is atomically updated with the index (in _threads) of the // thread that is currently waiting on the PR_Poll() call. It // contains -1 if no thread is so waiting. AtomicAdjust::Integer _currently_polling_thread; - // These structures track the total set of sockets (connections) we - // know about. - Sockets _sockets; - // This is the list of recently-removed sockets. We can't actually - // delete them until they're no longer _busy. - Sockets _removed_sockets; - // Any operations on _sockets are protected by this mutex. - LightMutex _sockets_mutex; - - friend class ConnectionManager; friend class ReaderThread; }; +#include "connectionReader.I" + #endif diff --git a/panda/src/net/datagramGeneratorNet.I b/panda/src/net/datagramGeneratorNet.I new file mode 100644 index 0000000000..eac8390c09 --- /dev/null +++ b/panda/src/net/datagramGeneratorNet.I @@ -0,0 +1,14 @@ +// Filename: datagramGeneratorNet.I +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + diff --git a/panda/src/net/datagramGeneratorNet.cxx b/panda/src/net/datagramGeneratorNet.cxx new file mode 100644 index 0000000000..50fa02cf1c --- /dev/null +++ b/panda/src/net/datagramGeneratorNet.cxx @@ -0,0 +1,138 @@ +// Filename: datagramGeneratorNet.cxx +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + +#include "pandabase.h" + +#include "datagramGeneratorNet.h" +#include "mutexHolder.h" +#include "lightMutexHolder.h" + +//////////////////////////////////////////////////////////////////// +// Function: DatagramGeneratorNet::Constructor +// Access: Published +// Description: Creates a new DatagramGeneratorNet with the indicated +// number of threads to handle requests. Normally +// num_threads should be either 0 or 1 to guarantee that +// datagrams are generated in the same order in which +// they were received. +//////////////////////////////////////////////////////////////////// +DatagramGeneratorNet:: +DatagramGeneratorNet(ConnectionManager *manager, int num_threads) : + ConnectionReader(manager, num_threads), + _dg_received(_dg_lock), + _dg_processed(_dg_lock) +{ +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramGeneratorNet::Destructor +// Access: Published, Virtual +// Description: +//////////////////////////////////////////////////////////////////// +DatagramGeneratorNet:: +~DatagramGeneratorNet() { +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramGeneratorNet::get_datagram +// Access: Published, Virtual +// Description: Reads the next datagram from the stream. Blocks +// until a datagram is available. Returns true on +// success, false on stream closed or error. +//////////////////////////////////////////////////////////////////// +bool DatagramGeneratorNet:: +get_datagram(Datagram &data) { + if (is_polling()) { + // Single-threaded case: we poll. No need to lock. + if (!thing_available()) { + poll(); + } + while (!thing_available()) { + if (is_eof()) { + return false; + } + poll(); + Thread::force_yield(); + } + bool got_dg = get_thing(data); + nassertr(got_dg, false); + + } else { + // Threaded case: no polling, we use mutexes and cvars to block + // instead. + MutexHolder holder(_dg_lock); + while (!thing_available()) { + if (is_eof()) { + return false; + } + _dg_received.wait(); + } + bool got_dg = get_thing(data); + nassertr(got_dg, false); + _dg_processed.notify(); + } + + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramGeneratorNet::is_eof +// Access: Published, Virtual +// Description: Returns true if the stream has been closed normally. +// This test may only be made after a call to +// get_datagram() has failed. +//////////////////////////////////////////////////////////////////// +bool DatagramGeneratorNet:: +is_eof() { + // We're at eof if we have no more connected sockets. + LightMutexHolder holder(_sockets_mutex); + return _sockets.empty(); +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramGeneratorNet::is_error +// Access: Published, Virtual +// Description: Returns true if the stream has an error condition. +//////////////////////////////////////////////////////////////////// +bool DatagramGeneratorNet:: +is_error() { + // There's an error if any one of our connected sockets reports an error. + LightMutexHolder holder(_sockets_mutex); + Sockets::const_iterator si; + for (si = _sockets.begin(); si != _sockets.end(); ++si) { + SocketInfo *sinfo = (*si); + if (sinfo->_error) { + return true; + } + } + + return false; +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramGeneratorNet::receive_datagram +// Access: Protected, Virtual +// Description: An internal function called by ConnectionReader() +// when a new datagram has become available. This call +// may be received in a sub-thread. +//////////////////////////////////////////////////////////////////// +void DatagramGeneratorNet:: +receive_datagram(const NetDatagram &datagram) { + MutexHolder holder(_dg_lock); + while (!enqueue_thing(datagram)) { + _dg_processed.wait(); + } + _dg_received.notify(); +} + diff --git a/panda/src/net/datagramGeneratorNet.h b/panda/src/net/datagramGeneratorNet.h new file mode 100644 index 0000000000..fc6697138b --- /dev/null +++ b/panda/src/net/datagramGeneratorNet.h @@ -0,0 +1,57 @@ +// Filename: datagramGeneratorNet.h +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + +#ifndef DATAGRAMGENERATORNET_H +#define DATAGRAMGENERATORNET_H + +#include "pandabase.h" + +#include "datagramGenerator.h" +#include "connectionReader.h" +#include "queuedReturn.h" +#include "pmutex.h" +#include "conditionVar.h" + +EXPORT_TEMPLATE_CLASS(EXPCL_PANDA_NET, EXPTP_PANDA_NET, QueuedReturn); + +//////////////////////////////////////////////////////////////////// +// Class : DatagramGeneratorNet +// Description : This class provides datagrams one-at-a-time as read +// directly from the net, via a TCP connection. If a +// datagram is not available, get_datagram() will block +// until one is. +//////////////////////////////////////////////////////////////////// +class EXPCL_PANDAEXPRESS DatagramGeneratorNet : public DatagramGenerator, public ConnectionReader, public QueuedReturn { +PUBLISHED: + DatagramGeneratorNet(ConnectionManager *manager, int num_threads); + virtual ~DatagramGeneratorNet(); + + // Inherited from DatagramGenerator + virtual bool get_datagram(Datagram &data); + virtual bool is_eof(); + virtual bool is_error(); + +protected: + // Inherited from ConnectionReader + virtual void receive_datagram(const NetDatagram &datagram); + + Mutex _dg_lock; + ConditionVar _dg_received; // notified when a new datagram is received. + ConditionVar _dg_processed; // notified when a new datagram is processed. +}; + +#include "datagramGeneratorNet.I" + +#endif + diff --git a/panda/src/net/datagramSinkNet.I b/panda/src/net/datagramSinkNet.I new file mode 100644 index 0000000000..bfd80694c4 --- /dev/null +++ b/panda/src/net/datagramSinkNet.I @@ -0,0 +1,36 @@ +// Filename: datagramSinkNet.I +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + + +//////////////////////////////////////////////////////////////////// +// Function: DatagramSinkNet::set_target +// Access: Published +// Description: Specifies the Connection that will receive all future +// Datagrams sent. +//////////////////////////////////////////////////////////////////// +INLINE void DatagramSinkNet:: +set_target(Connection *connection) { + _target = connection; +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramSinkNet::get_target +// Access: Published +// Description: Returns the current target Connection, or NULL if the +// target has not yet been set. See set_target(). +//////////////////////////////////////////////////////////////////// +INLINE Connection *DatagramSinkNet:: +get_target() const { + return _target; +} diff --git a/panda/src/net/datagramSinkNet.cxx b/panda/src/net/datagramSinkNet.cxx new file mode 100644 index 0000000000..89e70f0bef --- /dev/null +++ b/panda/src/net/datagramSinkNet.cxx @@ -0,0 +1,58 @@ +// Filename: datagramSinkNet.cxx +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + +#include "pandabase.h" + +#include "datagramSinkNet.h" + +//////////////////////////////////////////////////////////////////// +// Function: DatagramSinkNet::Constructor +// Access: Published +// Description: Creates a new DatagramSinkNet with the indicated +// number of threads to handle writing. Normally +// num_threads should be either 0 or 1 to guarantee that +// datagrams are delivered in the same order in which +// they were sent. +//////////////////////////////////////////////////////////////////// +DatagramSinkNet:: +DatagramSinkNet(ConnectionManager *manager, int num_threads) : + ConnectionWriter(manager, num_threads) +{ +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramSinkNet::put_datagram +// Access: Published, Virtual +// Description: Sends the given datagram to the target. Returns true +// on success, false if there is an error. Blocks if +// necessary until the target is ready. +//////////////////////////////////////////////////////////////////// +bool DatagramSinkNet:: +put_datagram(const Datagram &data) { + if (_target == (Connection *)NULL) { + return false; + } + return send(data, _target, true); +} + +//////////////////////////////////////////////////////////////////// +// Function: DatagramSinkNet::is_error +// Access: Published, Virtual +// Description: Returns true if there is an error on the target +// connection, or if the target has never been set. +//////////////////////////////////////////////////////////////////// +bool DatagramSinkNet:: +is_error() { + return (_target == (Connection *)NULL || _target->get_socket() == (Socket_IP *)NULL); +} diff --git a/panda/src/net/datagramSinkNet.h b/panda/src/net/datagramSinkNet.h new file mode 100644 index 0000000000..cd0dc4a33b --- /dev/null +++ b/panda/src/net/datagramSinkNet.h @@ -0,0 +1,45 @@ +// Filename: datagramSinkNet.h +// Created by: drose (15Feb09) +// +//////////////////////////////////////////////////////////////////// +// +// PANDA 3D SOFTWARE +// Copyright (c) Carnegie Mellon University. All rights reserved. +// +// All use of this software is subject to the terms of the revised BSD +// license. You should have received a copy of this license along +// with this source code in a file named "LICENSE." +// +//////////////////////////////////////////////////////////////////// + +#ifndef DATAGRAMSINKNET_H +#define DATAGRAMSINKNET_H + +#include "pandabase.h" + +#include "datagramSink.h" +#include "connectionWriter.h" + +//////////////////////////////////////////////////////////////////// +// Class : DatagramSinkNet +// Description : This class accepts datagrams one-at-a-time and sends +// them over the net, via a TCP connection. +//////////////////////////////////////////////////////////////////// +class EXPCL_PANDA_PUTIL DatagramSinkNet : public DatagramSink, public ConnectionWriter { +PUBLISHED: + DatagramSinkNet(ConnectionManager *manager, int num_threads); + + INLINE void set_target(Connection *connection); + INLINE Connection *get_target() const; + + // Inherited from DatagramSink + virtual bool put_datagram(const Datagram &data); + virtual bool is_error(); + +private: + PT(Connection) _target; +}; + +#include "datagramSinkNet.I" + +#endif diff --git a/panda/src/net/net_composite1.cxx b/panda/src/net/net_composite1.cxx index 4470824594..5dff58fac0 100644 --- a/panda/src/net/net_composite1.cxx +++ b/panda/src/net/net_composite1.cxx @@ -5,6 +5,6 @@ #include "connectionManager.cxx" #include "connectionReader.cxx" #include "connectionWriter.cxx" +#include "datagramGeneratorNet.cxx" +#include "datagramSinkNet.cxx" #include "datagramQueue.cxx" -#include "datagramTCPHeader.cxx" -#include "datagramUDPHeader.cxx" diff --git a/panda/src/net/net_composite2.cxx b/panda/src/net/net_composite2.cxx index af3fbd07e7..285a336887 100644 --- a/panda/src/net/net_composite2.cxx +++ b/panda/src/net/net_composite2.cxx @@ -1,4 +1,6 @@ +#include "datagramTCPHeader.cxx" +#include "datagramUDPHeader.cxx" #include "netAddress.cxx" #include "netDatagram.cxx" #include "queuedConnectionListener.cxx"