Changes to support yielding and read optimizations for AI

This commit is contained in:
Roger Hughston 2007-05-05 00:36:36 +00:00
parent ffa480bde7
commit 0ecf5adfbb
8 changed files with 66 additions and 52 deletions

View File

@ -179,31 +179,17 @@ void Datagram::assign(const void *data, size_t size)
{ {
nassertv((int)size >= 0); nassertv((int)size >= 0);
if (_data == (uchar *)NULL) { if (_data == (uchar *)NULL || _data.get_ref_count() != 1)
{
// Create a new array. // Create a new array.
// or copy on change..
_data = PTA_uchar::empty_array(0); _data = PTA_uchar::empty_array(0);
} else if (_data.get_ref_count() != 1) {
// Copy on write.
PTA_uchar new_data = PTA_uchar::empty_array(0);
new_data.v() = _data.v();
_data = new_data;
} }
else
// Now append the data. {
_data.v().clear();
// It is very important that we *don't* do this reserve() operation.
// This actually slows it down on Windows, which takes the reserve()
// request as a fixed size the array should be set to (!) instead of
// as a minimum size to guarantee. This forces the array to
// reallocate itself with *every* call to append_data!
// _data.reserve(_data.size() + size);
_data.clear();
const uchar *source = (const uchar *)data;
for (size_t i = 0; i < size; ++i) {
_data.v().push_back(source[i]);
} }
_data.v().insert(_data.v().begin(),(uchar *)data,&((uchar *)data)[size]);
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function : output // Function : output

View File

@ -61,7 +61,16 @@ operator = (const DatagramIterator &copy) {
_datagram = copy._datagram; _datagram = copy._datagram;
_current_index = copy._current_index; _current_index = copy._current_index;
} }
////////////////////////////////////////////////////////////////////
// Function: DatagramIterator::assign
// Access: Public
// Description: direct Assigment to a Datagram
////////////////////////////////////////////////////////////////////
INLINE void DatagramIterator::assign(Datagram &datagram, size_t offset)
{
_datagram =&datagram;
_current_index = offset;
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: DatagramIterator::Destructor // Function: DatagramIterator::Destructor
// Access: Public // Access: Public

View File

@ -32,6 +32,8 @@
// know the correct type and order of each element. // know the correct type and order of each element.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
class EXPCL_PANDAEXPRESS DatagramIterator { class EXPCL_PANDAEXPRESS DatagramIterator {
public:
INLINE void assign(Datagram &datagram, size_t offset = 0);
PUBLISHED: PUBLISHED:
INLINE DatagramIterator(); INLINE DatagramIterator();
INLINE DatagramIterator(const Datagram &datagram, size_t offset = 0); INLINE DatagramIterator(const Datagram &datagram, size_t offset = 0);

View File

@ -70,13 +70,24 @@ PUBLISHED:
inline bool GetMessage(Datagram &val); inline bool GetMessage(Datagram &val);
inline bool DoConnect(void); // all the real state magic is in here inline bool DoConnect(void); // all the real state magic is in here
inline bool IsConnected(void); inline bool IsConnected(void);
inline Buffered_DatagramConnection(bool do_blocking_writes, int rbufsize, int wbufsize, int write_flush_point) ; inline Buffered_DatagramConnection(int rbufsize, int wbufsize, int write_flush_point) ;
virtual ~Buffered_DatagramConnection(void) ; virtual ~Buffered_DatagramConnection(void) ;
// the reason thsi all exists // the reason thsi all exists
inline bool SendMessage(const Datagram &msg); inline bool SendMessage(const Datagram &msg);
inline bool Flush(void); inline bool Flush(void);
inline void Reset(void); inline void Reset(void);
// int WaitFor_Read_Error(const Socket_fdset & fd, const Time_Span & timeout);
inline void WaitForNetworkReadEvent(float MaxTime)
{
Socket_fdset fdset;
fdset.setForSocket(*this);
Socket_Selector selector;
Time_Span waittime(MaxTime);
selector.WaitFor_Read_Error(fdset,waittime);
}
// address queue stuff // address queue stuff
inline size_t AddressQueueSize() { return _Addresslist.size(); }; inline size_t AddressQueueSize() { return _Addresslist.size(); };
@ -127,11 +138,9 @@ inline bool Buffered_DatagramConnection::DoConnect(void) {
if(!_Addresslist.GetNext(_Adddress)) // lookup the proper value... if(!_Addresslist.GetNext(_Adddress)) // lookup the proper value...
return false; return false;
if(ActiveOpen(_Adddress) == true) { if(ActiveOpen(_Adddress,true) == true) {
SetNoDelay();
SetNonBlocking(); // maybe should be blocking? SetNonBlocking(); // maybe should be blocking?
//Let the operators do this
//SetSendBufferSize(1024*50); // we need to hand tune these for the os we are using
//SetRecvBufferSize(1024*50);
NewWriteBuffer(); NewWriteBuffer();
return true; return true;
} }
@ -209,8 +218,8 @@ inline Buffered_DatagramConnection::~Buffered_DatagramConnection(void)
// Argument : int rbufsize // Argument : int rbufsize
// Argument : int wbufsize // Argument : int wbufsize
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline Buffered_DatagramConnection::Buffered_DatagramConnection(bool do_blocking_writes, int rbufsize, int wbufsize, int write_flush_point) inline Buffered_DatagramConnection::Buffered_DatagramConnection(int rbufsize, int wbufsize, int write_flush_point)
: _Writer(do_blocking_writes,wbufsize,write_flush_point) , _Reader(rbufsize) : _Writer(wbufsize,write_flush_point) , _Reader(rbufsize)
{ {
nativenet_cat.error() << "Buffered_DatagramConnection Constructor rbufsize = " << rbufsize nativenet_cat.error() << "Buffered_DatagramConnection Constructor rbufsize = " << rbufsize
<< " wbufsize = " << wbufsize << " write_flush_point = " << write_flush_point << "\n"; << " wbufsize = " << wbufsize << " write_flush_point = " << write_flush_point << "\n";
@ -276,7 +285,8 @@ inline void Buffered_DatagramConnection::ClearAddresses(void)
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline bool Buffered_DatagramConnection::GetMessage(Datagram &val) inline bool Buffered_DatagramConnection::GetMessage(Datagram &val)
{ {
if(IsConnected()) { if(IsConnected())
{
int ans1 = _Reader.PumpMessageReader(val,*this); int ans1 = _Reader.PumpMessageReader(val,*this);
if(ans1 == 0) if(ans1 == 0)
return false; return false;
@ -290,6 +300,8 @@ inline bool Buffered_DatagramConnection::GetMessage(Datagram &val)
return false; return false;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::Flush // Function name : Buffered_DatagramConnection::Flush
// Description : flush all wrightes // Description : flush all wrightes

View File

@ -18,12 +18,11 @@
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
class Buffered_DatagramWriter : public RingBuffer class Buffered_DatagramWriter : public RingBuffer
{ {
bool _are_we_going_to_block_on_write;
int _flush_point; int _flush_point;
public: public:
inline void ReSet(void); // destroy all buffered data inline void ReSet(void); // destroy all buffered data
Buffered_DatagramWriter(bool do_blocking, size_t in_size , int in_flush_point = -1); Buffered_DatagramWriter( size_t in_size , int in_flush_point = -1);
inline int AddData(const void * data, size_t len, Socket_TCP &sck); inline int AddData(const void * data, size_t len, Socket_TCP &sck);
inline int AddData(const void * data, size_t len); inline int AddData(const void * data, size_t len);
// THE FUNCTIONS THAT TAKE A SOCKET NEED TO BE TEMPLATED TO WORK.. // THE FUNCTIONS THAT TAKE A SOCKET NEED TO BE TEMPLATED TO WORK..
@ -60,12 +59,6 @@ public:
if(Writesize > 0) { if(Writesize > 0) {
int Writen = sck.SendData(GetMessageHead(),(int)Writesize); int Writen = sck.SendData(GetMessageHead(),(int)Writesize);
if(_are_we_going_to_block_on_write == true && Writen < 0 && sck.ErrorIs_WouldBlocking(Writen) == true) {
//sck.SetBlocking();
Writen = sck.SendData(GetMessageHead(),(int)Writesize);
//sck.SetNonBlocking();
}
if(Writen > 0) { if(Writen > 0) {
_StartPos += Writen; _StartPos += Writen;
@ -98,9 +91,8 @@ inline void Buffered_DatagramWriter::ReSet(void) {
// //
// //
//////////////////////////////////////////////// ////////////////////////////////////////////////
inline Buffered_DatagramWriter::Buffered_DatagramWriter(bool do_blocking, size_t in_size , int in_flush_point) : RingBuffer(in_size) { inline Buffered_DatagramWriter::Buffered_DatagramWriter( size_t in_size , int in_flush_point) : RingBuffer(in_size) {
_flush_point = in_flush_point; _flush_point = in_flush_point;
_are_we_going_to_block_on_write = do_blocking;
} }
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
@ -120,12 +112,12 @@ inline int Buffered_DatagramWriter::AddData(const void * data, size_t len, Socke
if(answer >= 0) if(answer >= 0)
answer = AddData(data,len); answer = AddData(data,len);
/*
if(answer >= 0 && _flush_point != -1) if(answer >= 0 && _flush_point != -1)
if(_flush_point < (int)AmountBuffered()) if(_flush_point < (int)AmountBuffered())
if(Flush(sck) < 0) if(Flush(sck) < 0)
answer = -1; answer = -1;
*/
return answer; return answer;
} }

View File

@ -22,7 +22,8 @@ PUBLISHED:
inline int SetLinger(int interval_seconds = 0); inline int SetLinger(int interval_seconds = 0);
inline int DontLinger(); inline int DontLinger();
inline int SetSendBufferSize(int insize); inline int SetSendBufferSize(int insize);
inline bool ActiveOpen(const Socket_Address & theaddress); //inline bool ActiveOpen(const Socket_Address & theaddress);
inline bool ActiveOpen(const Socket_Address & theaddress, bool setdelay);
inline bool ActiveOpenNonBlocking(const Socket_Address & theaddress); inline bool ActiveOpenNonBlocking(const Socket_Address & theaddress);
inline bool ErrorIs_WouldBlocking(int err); inline bool ErrorIs_WouldBlocking(int err);
inline bool ShutdownSend(); inline bool ShutdownSend();
@ -127,12 +128,15 @@ int Socket_TCP::SetSendBufferSize(int insize)
// Description : This function will try and set the socket up for active open to a specified // Description : This function will try and set the socket up for active open to a specified
// address and port provided by the input parameter // address and port provided by the input parameter
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
bool Socket_TCP::ActiveOpen(const Socket_Address & theaddress) bool Socket_TCP::ActiveOpen(const Socket_Address & theaddress, bool setdelay)
{ {
_socket = DO_NEWTCP(); _socket = DO_NEWTCP();
if (_socket == BAD_SOCKET) if (_socket == BAD_SOCKET)
return false; return false;
if(setdelay)
SetNoDelay();
if (DO_CONNECT(_socket, &theaddress.GetAddressInfo()) != 0) if (DO_CONNECT(_socket, &theaddress.GetAddressInfo()) != 0)
return ErrorClose(); return ErrorClose();

View File

@ -23,6 +23,7 @@ public:
Time_Span(long seconds, int usecs ) ; Time_Span(long seconds, int usecs ) ;
Time_Span(const Time_Span& Time_SpanSrc); Time_Span(const Time_Span& Time_SpanSrc);
Time_Span(const Time_Clock& Time_SpanSrc); Time_Span(const Time_Clock& Time_SpanSrc);
Time_Span(float Seconds);
/////////////////// ///////////////////
@ -83,7 +84,15 @@ inline Time_Span::Time_Span(time_t time)
_my_time.tv_usec = 0; _my_time.tv_usec = 0;
_my_time.tv_sec = (long)time; _my_time.tv_sec = (long)time;
} }
//////////////////////////////////////////////////////////////
// Function name : Time_Span::Time_Span
// Description :
//////////////////////////////////////////////////////////////
inline Time_Span::Time_Span(float Seconds)
{
_my_time.tv_sec = Seconds; // this truncats .. desired result..
_my_time.tv_usec = (Seconds - (double)_my_time.tv_sec) * (double)USEC;
}
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
// Function name : Time_Span::Time_Span // Function name : Time_Span::Time_Span
// Description : // Description :

View File

@ -149,7 +149,7 @@ open_TCP_server_rendezvous(int port, int backlog) {
PT(Connection) ConnectionManager:: PT(Connection) ConnectionManager::
open_TCP_client_connection(const NetAddress &address, int timeout_ms) { open_TCP_client_connection(const NetAddress &address, int timeout_ms) {
Socket_TCP *socket = new Socket_TCP; Socket_TCP *socket = new Socket_TCP;
bool okflag = socket->ActiveOpen(address.get_addr()); bool okflag = socket->ActiveOpen(address.get_addr(),true);
if (!okflag) { if (!okflag) {
net_cat.error() net_cat.error()
<< "Unable to open TCP connection to server " << "Unable to open TCP connection to server "