nativenet: Assorted code clean-up

This commit is contained in:
rdb 2020-08-17 10:31:22 +02:00
parent 56ed86334b
commit 11fbdf4e6f
7 changed files with 244 additions and 272 deletions

View File

@ -6,11 +6,12 @@ set(P3NATIVENET_HEADERS
config_nativenet.h
buffered_datagramconnection.h
buffered_datagramreader.h buffered_datagramreader.I
buffered_datagramwriter.h buffered_datagramwriter.I
ringbuffer.h ringbuffer.I socket_ip.h
socket_tcp.h socket_tcp_listen.h
time_accumulator.h time_out.h
socket_address.I socket_address.h
socket_portable.h time_base.h time_span.h buffered_datagramwriter.h
socket_portable.h time_base.h time_span.h
socket_base.h socket_selector.h
socket_udp.h
socket_udp_incoming.h time_clock.h

View File

@ -1,22 +1,21 @@
/**
* A function that will peal a core message of the input buffer
* A function that will peel a core message off the input buffer
*/
inline bool Buffered_DatagramReader::
GetMessageFromBuffer(Datagram &inmsg) {
bool answer = false;
size_t DataAvail = FastAmountBeffered();
size_t DataAvail = _EndPos - _StartPos;
if (DataAvail >= sizeof(short)) {
char *ff = FastGetMessageHead();
unsigned short len=GetUnsignedShort(ff);
char *ff = _Buffer + _StartPos;
unsigned short len = *((unsigned short *)ff);
len += sizeof(unsigned short);
if (len <= DataAvail) {
inmsg.assign(ff + 2, len - 2);
_StartPos += len;
answer = true;
return true;
}
}
return answer;
return false;
}
/**
* Constructor. Passes size up to ring buffer.

View File

@ -5,87 +5,79 @@
#include "datagram.h"
#include "config_nativenet.h"
/**
*
*/
class Buffered_DatagramReader : protected RingBuffer {
private:
inline bool GetMessageFromBuffer(Datagram &inmsg);
inline unsigned short GetUnsignedShort(char * in)
{
return *((unsigned short *)in);
};
class Buffered_DatagramReader : protected RingBuffer
{
inline bool GetMessageFromBuffer(Datagram &inmsg);
public:
inline Buffered_DatagramReader(int in_size = 8192) ;
inline void ReSet(void);
// SOCK_TYPE is used to allow for abstract socket type to be used .. see
// socket_tcp and socket_ssl
inline Buffered_DatagramReader(int in_size = 8192) ;
inline void ReSet(void);
// SOCK_TYPE is used to allow for abstract socket type to be used .. see
// socket_tcp and socket_ssl
template < class SOCK_TYPE>
inline int PumpMessageReader(Datagram &inmsg, SOCK_TYPE &sck)
{
if(GetMessageFromBuffer(inmsg) == true)
return 1;
int rp = ReadPump(sck);
if(rp == 0)
return 0;
if(rp < 1)
return -1;
if(GetMessageFromBuffer(inmsg) == true)
return 1;
return 0;
template<class SOCK_TYPE>
inline int PumpMessageReader(Datagram &inmsg, SOCK_TYPE &sck) {
if (GetMessageFromBuffer(inmsg)) {
return 1;
}
int rp = ReadPump(sck);
if (rp == 0) {
return 0;
}
template < class SOCK_TYPE>
inline int ReadPump(SOCK_TYPE &sck)
{
int answer = 0;
size_t readsize = BufferAvailabe();
if(readsize < 1)
{
Compress();
readsize = BufferAvailabe();
}
if(readsize > 0)
{
char * ff = GetBufferOpen();
int gotbytes = sck.RecvData(ff,(int)readsize);
if(gotbytes < 0) // some error
{
// int er = GETERROR();
if(!sck.ErrorIs_WouldBlocking(gotbytes) )
{
answer = -3; // hard error ?
nativenet_cat.error() << "buffered_datagram_reader:ReadPump socket read error -- " << GETERROR() <<", " << sck.GetPeerName().get_ip_port().c_str() << "\n";
}
else
{
answer = 0; // try again nothing to read
}
}
else if(gotbytes > 0) // ok got some lets process it
{
_EndPos += gotbytes;
answer = 1;
}
else // 0 mean other end disconect arggggg
{
answer = -1;
nativenet_cat.error() << "buffered_datagram_reader:ReadPump other end of socket closed -- " << sck.GetPeerName().get_ip_port().c_str() << "\n";
}
}
else
{
answer = -2;
nativenet_cat.error() << "buffered_datagram_reader:ReadPump Yeep! buffer has no room to read to -- " << sck.GetPeerName().get_ip_port().c_str() << "\nBufferAvaiable = " << readsize <<" AmountBuffered = " << AmountBuffered() << " BufferSize " << GetBufferSize() << "\n";
}
return answer;
if (rp < 1) {
return -1;
}
if (GetMessageFromBuffer(inmsg)) {
return 1;
}
return 0;
}
template<class SOCK_TYPE>
inline int ReadPump(SOCK_TYPE &sck) {
int answer = 0;
size_t readsize = BufferAvailable();
if (readsize < 1) {
Compress();
readsize = BufferAvailable();
}
if (readsize > 0) {
char *ff = GetBufferOpen();
int gotbytes = sck.RecvData(ff, (int)readsize);
if (gotbytes < 0) { // some error
// int er = GETERROR();
if (!sck.ErrorIs_WouldBlocking(gotbytes)) {
nativenet_cat.error()
<< "buffered_datagram_reader:ReadPump socket read error -- " << GETERROR() <<", " << sck.GetPeerName().get_ip_port().c_str() << "\n";
return -3; // hard error ?
}
else {
return 0; // try again nothing to read
}
}
else if (gotbytes > 0) { // ok got some lets process it
_EndPos += gotbytes;
return 1;
}
else { // 0 mean other end disconect arggggg
nativenet_cat.error()
<< "buffered_datagram_reader:ReadPump other end of socket closed -- " << sck.GetPeerName().get_ip_port().c_str() << "\n";
return -1;
}
}
else {
nativenet_cat.error()
<< "buffered_datagram_reader:ReadPump Yeep! buffer has no room to read to -- " << sck.GetPeerName().get_ip_port().c_str() << "\nBufferAvailable = " << readsize <<" AmountBuffered = " << AmountBuffered() << " BufferSize " << GetBufferSize() << "\n";
return -2;
}
return answer;
}
};
#include "buffered_datagramreader.I"

View File

@ -0,0 +1,60 @@
/**
* used to clear the buffers ... use of this in mid stream is a very bad
* thing as you cannot guarantee network writes are message-aligned
*/
inline void Buffered_DatagramWriter::
ReSet(void) {
ResetContent();
}
/**
*
*/
inline Buffered_DatagramWriter::
Buffered_DatagramWriter(size_t in_size, int in_flush_point) : RingBuffer(in_size) {
_flush_point = in_flush_point;
}
/**
*
*/
inline int Buffered_DatagramWriter::
AddData(const void * data, size_t len, Socket_TCP &sck) {
int answer = 0;
if (len > BufferAvailable()) {
answer = Flush(sck);
}
if (answer >= 0) {
answer = AddData(data,len);
}
if (answer >= 0 && _flush_point != -1) {
if (_flush_point < (int)AmountBuffered()) {
if (Flush(sck) < 0) {
answer = -1;
}
}
}
return answer;
}
/**
*
*/
inline int Buffered_DatagramWriter::
AddData(const void *data, size_t len) {
if (BufferAvailable() > len + 2) {
unsigned short len1(len);
TS_GetInteger(len1, (char *)&len1);
if (Put((char *)&len1, sizeof(len1))) {
if (Put((char *)data, len)) {
return 1;
}
}
}
return -1;
}

View File

@ -1,7 +1,8 @@
#ifndef BufferedWriter_H
#define BufferedWriter_H
#ifndef BUFFERED_DATAGRAMWRITER_H
#define BUFFERED_DATAGRAMWRITER_H
#include "ringbuffer.h"
/**
* This is the buffered writer.. it is used to buffer up Coremessages and
* arbitrary data..
@ -15,115 +16,64 @@
* no partial message rights at least to the TCP layer..
*
*/
class Buffered_DatagramWriter : public RingBuffer
{
int _flush_point;
class Buffered_DatagramWriter : public RingBuffer {
public:
inline void ReSet(void); // destroy all buffered data
inline void ReSet(void); // destroy all buffered data
Buffered_DatagramWriter( size_t in_size , int in_flush_point = -1);
inline int AddData(const void * data, size_t len, Socket_TCP &sck);
inline int AddData(const void * data, size_t len);
// THE FUNCTIONS THAT TAKE A SOCKET NEED TO BE TEMPLATED TO WORK..
Buffered_DatagramWriter(size_t in_size, int in_flush_point = -1);
inline int AddData(const void *data, size_t len, Socket_TCP &sck);
inline int AddData(const void *data, size_t len);
template < class SOCK_TYPE>
int FlushNoBlock(SOCK_TYPE &sck) { // this is the ugly part
int answer = 0;
size_t Writesize = AmountBuffered();
if(Writesize > 0) {
int Writen = sck.SendData(GetMessageHead(),(int)Writesize);
if(Writen > 0) {
_StartPos += Writen;
FullCompress();
if(AmountBuffered() > 0) // send 0 if empty else send 1 for more to do
answer = 1;
}
else if(Writen < 0) {
if(!sck.ErrorIs_WouldBlocking(Writen))
answer = -1;
else
answer = 1; // 1 = more to do.....
}
}
return answer;
};
template < class SOCK_TYPE>
inline int Flush(SOCK_TYPE &sck) {
int answer = 0;
size_t Writesize = AmountBuffered();
if(Writesize > 0) {
int Writen = sck.SendData(GetMessageHead(),(int)Writesize);
if(Writen > 0) {
_StartPos += Writen;
FullCompress();
if(AmountBuffered() > 0) //send 0 if empty else send 1 for more to do
answer = 1;
}
else if(Writen < 0) {
if(sck.ErrorIs_WouldBlocking(Writen) != true)
answer = -1;
}
}
return answer;
};
};
/**
* used to clear the buffrers ... use of this in mid stream is a very bad
* thing as you can not guarany network writes are message alligned
*/
inline void Buffered_DatagramWriter::ReSet(void) {
ResetContent();
}
// Buffered_DatagramWriter::Buffered_DatagramWriter
inline Buffered_DatagramWriter::Buffered_DatagramWriter( size_t in_size , int in_flush_point) : RingBuffer(in_size) {
_flush_point = in_flush_point;
}
/**
*
*/
inline int Buffered_DatagramWriter::AddData(const void * data, size_t len, Socket_TCP &sck) {
int answer = 0;
if(len > BufferAvailabe())
answer = Flush(sck);
if(answer >= 0)
answer = AddData(data,len);
if(answer >= 0 && _flush_point != -1)
if(_flush_point < (int)AmountBuffered())
if(Flush(sck) < 0)
answer = -1;
return answer;
}
/**
*
*/
inline int Buffered_DatagramWriter::AddData(const void * data, size_t len)
{
int answer = -1;
if(BufferAvailabe() > len+2) {
unsigned short len1(len);
TS_GetInteger(len1,(char *)&len1);
if(Put((char *)&len1,sizeof(len1)) == true) {
if(Put((char *)data,len) == true) {
answer = 1;
// THE FUNCTIONS THAT TAKE A SOCKET NEED TO BE TEMPLATED TO WORK..
template<class SOCK_TYPE>
int FlushNoBlock(SOCK_TYPE &sck) { // this is the ugly part
size_t writesize = AmountBuffered();
if (writesize > 0) {
int written = sck.SendData(GetMessageHead(), (int)writesize);
if (written > 0) {
_StartPos += written;
FullCompress();
if (AmountBuffered() > 0) {// send 0 if empty else send 1 for more to do
return 1;
}
}
else if (written < 0) {
if (!sck.ErrorIs_WouldBlocking(written)) {
return -1;
} else {
return 1; // 1 = more to do.....
}
}
}
}
return 0;
};
return answer;
}
#endif //BufferedWriter_H
template<class SOCK_TYPE>
inline int Flush(SOCK_TYPE &sck) {
size_t writesize = AmountBuffered();
if (writesize > 0) {
int written = sck.SendData(GetMessageHead(),(int)writesize);
if (written > 0) {
_StartPos += written;
FullCompress();
if (AmountBuffered() > 0) { //send 0 if empty else send 1 for more to do
return 1;
}
}
else if(written < 0) {
if (!sck.ErrorIs_WouldBlocking(written)) {
return -1;
}
}
}
return 0;
};
private:
int _flush_point;
};
#include "buffered_datagramwriter.I"
#endif // BUFFERED_DATAGRAMWRITER_H

View File

@ -2,26 +2,26 @@
/**
* This will get a pointer to the fist undelivered data in buffer
*/
inline char * RingBuffer::GetMessageHead(void)
{
return _Buffer+_StartPos;
inline char *RingBuffer::
GetMessageHead(void) {
return _Buffer + _StartPos;
}
/**
* This will get the first writabe section of the buffer space
*/
inline char * RingBuffer::GetBufferOpen(void)
{
return _Buffer+_EndPos;
inline char *RingBuffer::
GetBufferOpen(void) {
return _Buffer + _EndPos;
}
/**
* Will force a compression of data // shift left to start position
*/
inline void RingBuffer::ForceWindowSlide(void)
{
inline void RingBuffer::
ForceWindowSlide(void) {
size_t len = AmountBuffered();
if(len > 0 && _StartPos != 0) // basic flush left..
{
memmove(_Buffer,GetMessageHead(),len);
if(len > 0 && _StartPos != 0) { // basic flush left..
memmove(_Buffer, GetMessageHead(), len);
_StartPos = 0;
_EndPos = len;
}
@ -29,120 +29,95 @@ inline void RingBuffer::ForceWindowSlide(void)
/**
* Will report the number of unread chars in buffer
*/
inline size_t RingBuffer::AmountBuffered(void)
{
inline size_t RingBuffer::
AmountBuffered(void) {
return _EndPos - _StartPos;
}
/**
* Will report amount of data that is contiguas that can be writen at the
* Will report amount of data that is contiguous that can be written at the
* location returned by GetBufferOpen
*/
inline size_t RingBuffer::BufferAvailabe(void)
{
inline size_t RingBuffer::
BufferAvailable(void) {
return GetBufferSize() - _EndPos;
}
/**
* Throw away all inread information
*/
void RingBuffer::ResetContent(void)
{
void RingBuffer::
ResetContent(void) {
_StartPos = 0;
_EndPos = 0;
}
/**
*
*/
inline RingBuffer::RingBuffer(size_t in_size) : MemBuffer(in_size)
{
inline RingBuffer::
RingBuffer(size_t in_size) : MemBuffer(in_size) {
_EndPos = 0;
_StartPos = 0;
}
/**
* Force a compress of the data
*/
inline void RingBuffer::FullCompress(void)
{
if(_StartPos == _EndPos)
{
inline void RingBuffer::
FullCompress(void) {
if (_StartPos == _EndPos) {
_StartPos = 0;
_EndPos = 0;
}
else
{
else {
ForceWindowSlide();
}
}
/**
* Try and do a intelegent compress of the data space the algorithem is really
* Try and do a intelligent compress of the data space the algorithm is really
* stupid right know.. just say if i have read past 1/2 my space do a
* compress...Im open for sugestions
*
*
*/
inline void RingBuffer::Compress(void)
{
if(_StartPos == _EndPos)
{
inline void RingBuffer::
Compress(void) {
if (_StartPos == _EndPos) {
_StartPos = 0;
_EndPos = 0;
}
else if(_StartPos >= GetBufferSize() / 2)
{
else if (_StartPos >= GetBufferSize() / 2) {
ForceWindowSlide();
}
}
/**
* Adds Data to a ring Buffer Will do a compress if needed so pointers suplied
* by Get Call are no longer valide
*
*/
inline bool RingBuffer::Put(const char * data, size_t len)
{
bool answer = false;
if(len > BufferAvailabe() )
/**
* Adds data to a ring buffer. Will do a compress if needed, so pointers
* supplied by get call are no longer valid.
*/
inline bool RingBuffer::
Put(const char *data, size_t len) {
if (len > BufferAvailable()) {
Compress();
if(len <= BufferAvailabe() )
{
memcpy(GetBufferOpen(),data,len);
_EndPos += len;
answer = true;
}
return answer;
}
/**
*
*
*/
inline bool RingBuffer::PutFast(const char * data, size_t len)
{
// no checking be carefull
memcpy(GetBufferOpen(),data,len); // should i be using memcopy..
_EndPos += len;
return true;
if (len <= BufferAvailable()) {
memcpy(GetBufferOpen(), data, len);
_EndPos += len;
return true;
}
return false;
}
/**
* will copy the data .. false indicates not enogh data to read .. sorry...
*
*/
inline bool RingBuffer::Get(char * data, size_t len)
{
bool answer = false;
if(len <= AmountBuffered() )
{
memcpy(data,GetMessageHead(),len);
inline bool RingBuffer::
Get(char *data, size_t len) {
if (len <= AmountBuffered()) {
memcpy(data, GetMessageHead(), len);
_StartPos += len;
Compress();
answer = true;
return true;
}
return answer;
return false;
}

View File

@ -11,14 +11,9 @@ protected:
inline char *GetBufferOpen(void);
inline void ForceWindowSlide(void);
#define FastGetMessageHead() (_Buffer + _StartPos)
#define FastAmountBeffered() (_EndPos - _StartPos)
inline bool PutFast(const char * data, size_t len);
public:
inline size_t AmountBuffered(void);
inline size_t BufferAvailabe(void);
inline size_t BufferAvailable(void);
inline void ResetContent(void);
inline RingBuffer(size_t in_size = 4096);