okay, now it works!

This commit is contained in:
Zachary Pavlov 2006-11-10 04:21:08 +00:00
parent 3fb756cfb2
commit b29c58e3f5
3 changed files with 110 additions and 113 deletions

View File

@ -64,39 +64,30 @@ protected:
virtual void NewWriteBuffer(void) { }; virtual void NewWriteBuffer(void) { };
/////////////////////////////////////////// ///////////////////////////////////////////
inline void ClearAll(void); inline void ClearAll(void);
inline bool DoConnect(void); // all the real state magic is in here ...
inline bool SendMessageBufferOnly(Datagram &msg); // do not use this .. this is a way for the the COnnecting UPcall to drop messages in queue first.. inline bool SendMessageBufferOnly(Datagram &msg); // do not use this .. this is a way for the the COnnecting UPcall to drop messages in queue first..
public: public:
inline bool GetMessageInternal(Datagram **val); inline bool GetMessage(Datagram &val);
PUBLISHED: PUBLISHED:
inline bool IsConnected(void) { return DoConnect(); }; inline bool DoConnect(void); // all the real state magic is in here
inline bool IsConnected(void);
inline Buffered_DatagramConnection(bool do_blocking_writes, int rbufsize, int wbufsize, int write_flush_point) ; inline Buffered_DatagramConnection(bool do_blocking_writes, int rbufsize, int wbufsize, int write_flush_point) ;
virtual ~Buffered_DatagramConnection(void) ; virtual ~Buffered_DatagramConnection(void) ;
// the reason thsi all exists // the reason thsi all exists
inline bool SendMessage(const Datagram &msg); inline bool SendMessage(const Datagram &msg);
inline Datagram * GetMessage();
inline bool Flush(void); inline bool Flush(void);
inline void Reset(void); inline void Reset(void);
// address queue stuff // address queue stuff
inline size_t AddressQueueSize() { return _Addresslist.size(); }; inline size_t AddressQueueSize() { return _Addresslist.size(); };
inline void AddAddress(Socket_Address &inadr); inline void AddAddress(Socket_Address &inadr);
inline void ClearAddresses(void); inline void ClearAddresses(void);
private: private:
Buffered_DatagramWriter _Writer; // buffered writing Buffered_DatagramWriter _Writer; // buffered writing
Buffered_DatagramReader _Reader; // buffered reader Buffered_DatagramReader _Reader; // buffered reader
AddressQueue _Addresslist; // the location of the round robin address list AddressQueue _Addresslist; // the location of the round robin address list
Socket_Address _Adddress; // the conection address ( active one from list being used) Socket_Address _Adddress; // the conection address ( active one from list being used)
// the local datagram store..
Datagram _Msg; // The temp storage for a upcalled message
////////////////////////////////
// connection state engine /////
Time_Out _LastConnectTry; // A Recycle Timer to Stop Connection/spaming..
bool _tryingToOpen; // this is a flag that say we are in state 2
// state 1 = Active() == false,,
// state 2 = Active() == true and _tryingToOpen == true;
// state 3 = Active() == true and _tryingToOpen == flase;
friend class Buffered_DatagramReader; friend class Buffered_DatagramReader;
friend class Buffered_DatagramWriter; friend class Buffered_DatagramWriter;
@ -111,11 +102,27 @@ private:
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline void Buffered_DatagramConnection::ClearAll(void) { inline void Buffered_DatagramConnection::ClearAll(void) {
Close(); Close();
_tryingToOpen = false;
_Writer.ReSet(); _Writer.ReSet();
_Reader.ReSet(); _Reader.ReSet();
} }
inline bool Buffered_DatagramConnection::DoConnect(void) {
if(!_Addresslist.GetNext(_Adddress)) // lookup the proper value...
return false;
if(ActiveOpen(_Adddress) == true) {
SetNonBlocking(); // maybe should be blocking?
SetSendBufferSize(1024*50); // we need to hand tune these for the os we are using
SetRecvBufferSize(1024*50);
NewWriteBuffer();
return true;
}
return false;
}
/*
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::DoConnect // Function name : Buffered_DatagramConnection::DoConnect
// Description : This is the function thah does the conection for us // Description : This is the function thah does the conection for us
@ -126,7 +133,7 @@ inline void Buffered_DatagramConnection::ClearAll(void) {
inline bool Buffered_DatagramConnection::DoConnect(void) { inline bool Buffered_DatagramConnection::DoConnect(void) {
if(Active() != true) { if(Active() != true) {
if(_LastConnectTry.Expired() != true) if(_LastConnectTry.Expired() != true)
return false; return true;
if(!_Addresslist.GetNext(_Adddress)) // lookup the proper value... if(!_Addresslist.GetNext(_Adddress)) // lookup the proper value...
return false; return false;
@ -141,7 +148,7 @@ inline bool Buffered_DatagramConnection::DoConnect(void) {
return true; return true;
} }
return false; return true;
} }
if(_tryingToOpen) { // okay handle the i am connecting state.... if(_tryingToOpen) { // okay handle the i am connecting state....
@ -161,6 +168,9 @@ inline bool Buffered_DatagramConnection::DoConnect(void) {
} }
return true; return true;
} }
*/
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::~Buffered_DatagramConnection // Function name : Buffered_DatagramConnection::~Buffered_DatagramConnection
// Description : // Description :
@ -182,9 +192,9 @@ inline Buffered_DatagramConnection::~Buffered_DatagramConnection(void)
// Argument : int wbufsize // Argument : int wbufsize
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline Buffered_DatagramConnection::Buffered_DatagramConnection(bool do_blocking_writes, int rbufsize, int wbufsize, int write_flush_point) inline Buffered_DatagramConnection::Buffered_DatagramConnection(bool do_blocking_writes, int rbufsize, int wbufsize, int write_flush_point)
: _Writer(do_blocking_writes,wbufsize,write_flush_point) , _Reader(rbufsize), _Msg() , _LastConnectTry(Time_Span(0,0,0,1,0)) : _Writer(do_blocking_writes,wbufsize,write_flush_point) , _Reader(rbufsize)
{ {
_LastConnectTry.ForceToExpired();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::SendMessage // Function name : Buffered_DatagramConnection::SendMessage
@ -195,25 +205,18 @@ inline Buffered_DatagramConnection::Buffered_DatagramConnection(bool do_blocking
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline bool Buffered_DatagramConnection::SendMessage(const Datagram &msg) inline bool Buffered_DatagramConnection::SendMessage(const Datagram &msg)
{ {
if(DoConnect() == true) if(IsConnected()) {
{
// printf(" DO SendMessage %d\n",msg.get_length()); // printf(" DO SendMessage %d\n",msg.get_length());
int val = 0; int val = 0;
if(_tryingToOpen) // indicates we are in the process of opening the connection ....just buffer val = _Writer.AddData(msg.get_data(),msg.get_length(),*this);
{
val = _Writer.AddData(msg.get_data(),msg.get_length()); if(val >= 0)
} return true;
else
{
val = _Writer.AddData(msg.get_data(),msg.get_length(),*this);
}
if(val >= 0)
return true;
// LOGWARNING("Buffered_DatagramConnection::SendMessage->Error On Write--Out Buffer = %d",_Writer.AmountBuffered()); // LOGWARNING("Buffered_DatagramConnection::SendMessage->Error On Write--Out Buffer = %d",_Writer.AmountBuffered());
ClearAll(); ClearAll();
} }
return false; return false;
} }
inline bool Buffered_DatagramConnection::SendMessageBufferOnly(Datagram &msg) inline bool Buffered_DatagramConnection::SendMessageBufferOnly(Datagram &msg)
@ -244,7 +247,7 @@ inline void Buffered_DatagramConnection::ClearAddresses(void)
_Addresslist.clear(); _Addresslist.clear();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::GetMessage // Function name : Buffered_DatagramConnection::GetMessageInternal
// Description : read a message // Description : read a message
// //
// false means something bad happened.. // false means something bad happened..
@ -253,34 +256,22 @@ inline void Buffered_DatagramConnection::ClearAddresses(void)
// Return type : inline bool // Return type : inline bool
// Argument : DataGram **val // Argument : DataGram **val
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline bool Buffered_DatagramConnection::GetMessageInternal(Datagram **val) inline bool Buffered_DatagramConnection::GetMessage(Datagram &val)
{ {
*val = NULL; if(IsConnected()) {
if(DoConnect() == true && _tryingToOpen != true) int ans1 = _Reader.PumpMessageReader(val,*this);
{ if(ans1 == 0)
int ans1 = _Reader.PumpMessageReader(_Msg,*this); return false;
if(ans1 <0) if(ans1 <0) {
{ // LOGWARNING("Buffered_DatagramConnection::GetMessage->Error On PumpMessageReader--Out Buffer = %d",_Writer.AmountBuffered());
// LOGWARNING("Buffered_DatagramConnection::GetMessage->Error On PumpMessageReader--Out Buffer = %d",_Writer.AmountBuffered()); ClearAll();
ClearAll(); return false;
return false; }
} return true;
else if (ans1 == 1) }
{ return false;
*val = &_Msg;
}
return true;
}
return false;
} }
inline Datagram * Buffered_DatagramConnection::GetMessage()
{
Datagram *out = NULL;
GetMessageInternal(&out);
return out;
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::Flush // Function name : Buffered_DatagramConnection::Flush
// Description : flush all wrightes // Description : flush all wrightes
@ -290,7 +281,7 @@ inline Datagram * Buffered_DatagramConnection::GetMessage()
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
bool Buffered_DatagramConnection::Flush(void) bool Buffered_DatagramConnection::Flush(void)
{ {
if(Active() == true && _tryingToOpen != true ) if (IsConnected())
{ {
int flush_resp = _Writer.FlushNoBlock(*this); int flush_resp = _Writer.FlushNoBlock(*this);
if(flush_resp < 0) if(flush_resp < 0)
@ -305,7 +296,7 @@ bool Buffered_DatagramConnection::Flush(void)
return false; return false;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramConnection::Flush // Function name : Buffered_DatagramConnection::Reset
// Description : Reset // Description : Reset
// //
// Return type : void // Return type : void
@ -317,5 +308,10 @@ inline void Buffered_DatagramConnection::Reset()
}; };
inline bool Buffered_DatagramConnection::IsConnected(void) {
return ( Active() == true );
}
#endif //__NONECLOCKING_CONNECTTION_H_ #endif //__NONECLOCKING_CONNECTTION_H_

View File

@ -1,49 +1,49 @@
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramReader::GetMessageInplace // Function name : Buffered_DatagramReader::GetMessageInplace
// Description : A function that will peal a core message of the input buffer // Description : A function that will peal a core message of the input buffer
// //
// Return type : inline bool // Return type : inline bool
// Argument : CoreMessage &inmsg // Argument : CoreMessage &inmsg
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline bool Buffered_DatagramReader::GetMessageFromBuffer(Datagram &inmsg) inline bool Buffered_DatagramReader::GetMessageFromBuffer(Datagram &inmsg)
{ {
bool answer = false; bool answer = false;
size_t DataAvail = FastAmountBeffered(); size_t DataAvail = FastAmountBeffered();
if(DataAvail >= sizeof(short)) if(DataAvail >= sizeof(short))
{ {
char *ff = FastGetMessageHead(); char *ff = FastGetMessageHead();
unsigned short len=GetUnsignedShort(ff); unsigned short len=GetUnsignedShort(ff);
len += sizeof(unsigned short); len += sizeof(unsigned short);
if(len <= DataAvail) if(len <= DataAvail)
{ {
inmsg.assign(ff,len-2); inmsg.assign(ff+2,len-2);
_StartPos += len; _StartPos += len;
answer = true; answer = true;
} }
} }
return answer; return answer;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramReader::Buffered_DatagramReader // Function name : Buffered_DatagramReader::Buffered_DatagramReader
// Description : constructore .. passes size up to ring buffer // Description : constructore .. passes size up to ring buffer
// //
// Return type : inline // Return type : inline
// Argument : int in_size // Argument : int in_size
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline Buffered_DatagramReader::Buffered_DatagramReader(int in_size) : RingBuffer(in_size) inline Buffered_DatagramReader::Buffered_DatagramReader(int in_size) : RingBuffer(in_size)
{ {
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function name : Buffered_DatagramReader::ReSet // Function name : Buffered_DatagramReader::ReSet
// Description : Reaset all read content.. IE zero's out buffer... // Description : Reaset all read content.. IE zero's out buffer...
// //
// If you lose framing this will not help // If you lose framing this will not help
// //
// Return type : inline void // Return type : inline void
// Argument : void // Argument : void
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
inline void Buffered_DatagramReader::ReSet(void) inline void Buffered_DatagramReader::ReSet(void)
{ {
ResetContent(); ResetContent();
} }

View File

@ -120,11 +120,12 @@ inline int Buffered_DatagramWriter::AddData(const void * data, size_t len, Socke
if(answer >= 0) if(answer >= 0)
answer = AddData(data,len); answer = AddData(data,len);
/*
if(answer >= 0 && _flush_point != -1) if(answer >= 0 && _flush_point != -1)
if(_flush_point < (int)AmountBuffered()) if(_flush_point < (int)AmountBuffered())
if(Flush(sck) < 0) if(Flush(sck) < 0)
answer = -1; answer = -1;
*/
return answer; return answer;
} }