non-blocking I/O

This commit is contained in:
David Rose 2002-10-17 01:41:37 +00:00
parent a33c3414b9
commit 36d8e8feb3
9 changed files with 1264 additions and 670 deletions

View File

@ -75,3 +75,25 @@ INLINE BIO *BioPtr::
get_bio() const {
return _bio;
}
////////////////////////////////////////////////////////////////////
// Function: BioPtr::get_server_name
// Access: Public
// Description: Returns the name of the server we are (or should be)
// connected to.
////////////////////////////////////////////////////////////////////
INLINE const string &BioPtr::
get_server_name() const {
return _server_name;
}
////////////////////////////////////////////////////////////////////
// Function: BioPtr::get_port
// Access: Public
// Description: Returns the port on which we are (or should be)
// connected.
////////////////////////////////////////////////////////////////////
INLINE int BioPtr::
get_port() const {
return _port;
}

View File

@ -52,6 +52,8 @@ public:
INLINE BIO *get_bio() const;
bool connect() const;
INLINE const string &get_server_name() const;
INLINE int get_port() const;
private:
BIO *_bio;

View File

@ -103,6 +103,7 @@ underflow() {
// Oops, we didn't read what we thought we would.
if (read_count <= 0) {
_is_closed = !BIO_should_retry(*_source);
gbump(num_bytes);
return EOF;
}

View File

@ -116,6 +116,7 @@ underflow() {
if (read_count != num_bytes) {
// Oops, we didn't read what we thought we would.
if (read_count == 0) {
gbump(num_bytes);
return EOF;
}
@ -146,16 +147,16 @@ read_chars(char *start, size_t length) {
// Extract some of the bytes remaining in the chunk.
length = min(length, _chunk_remaining);
(*_source)->read(start, length);
length = (*_source)->gcount();
_chunk_remaining -= length;
return length;
size_t read_count = (*_source)->gcount();
_chunk_remaining -= read_count;
return read_count;
}
// Read the next chunk.
string line;
getline(**_source, line);
if (!line.empty() && line[line.length() - 1] == '\r') {
line = line.substr(0, line.length() - 1);
if (!http_getline(line)) {
// EOF (or data unavailable) while trying to read the chunk size.
return 0;
}
size_t chunk_size = (size_t)strtol(line.c_str(), NULL, 16);
if (chunk_size == 0) {
@ -175,4 +176,40 @@ read_chars(char *start, size_t length) {
return read_chars(start, length);
}
////////////////////////////////////////////////////////////////////
// Function: ChunkedStreamBuf::http_getline
// Access: Private
// Description: Reads a single line from the stream. Returns
// true if the line is successfully retrieved, or false
// if a complete line has not yet been received or if
// the connection has been closed.
////////////////////////////////////////////////////////////////////
bool ChunkedStreamBuf::
http_getline(string &str) {
nassertr(!_source.is_null(), false);
int ch = (*_source)->get();
while (!(*_source)->eof() && !(*_source)->fail()) {
switch (ch) {
case '\n':
// end-of-line character, we're done.
if (downloader_cat.is_spam()) {
downloader_cat.spam() << "recv: " << _working_getline << "\n";
}
str = _working_getline;
_working_getline = string();
return true;
case '\r':
// Ignore CR characters.
break;
default:
_working_getline += (char)ch;
}
ch = (*_source)->get();
}
return false;
}
#endif // HAVE_SSL

View File

@ -47,10 +47,12 @@ protected:
private:
size_t read_chars(char *start, size_t length);
bool http_getline(string &str);
PT(BioStreamPtr) _source;
size_t _chunk_remaining;
bool _done;
string _working_getline;
PT(HTTPChannel) _doc;
int _read_index;

View File

@ -26,7 +26,8 @@
////////////////////////////////////////////////////////////////////
INLINE bool HTTPChannel::
is_valid() const {
return (!_source.is_null() && (_status_code / 100) == 2);
return (!_source.is_null() && _state != S_failure &&
(_status_code / 100) == 2);
}
////////////////////////////////////////////////////////////////////
@ -192,7 +193,9 @@ get_file_size() const {
////////////////////////////////////////////////////////////////////
INLINE bool HTTPChannel::
post_form(const URLSpec &url, const string &body) {
return send_request("POST", url, body);
begin_request("POST", url, body, false);
run();
return is_valid();
}
////////////////////////////////////////////////////////////////////
@ -203,7 +206,9 @@ post_form(const URLSpec &url, const string &body) {
////////////////////////////////////////////////////////////////////
INLINE bool HTTPChannel::
get_document(const URLSpec &url) {
return send_request("GET", url, string());
begin_request("GET", url, string(), false);
run();
return is_valid();
}
////////////////////////////////////////////////////////////////////
@ -217,5 +222,75 @@ get_document(const URLSpec &url) {
////////////////////////////////////////////////////////////////////
INLINE bool HTTPChannel::
get_header(const URLSpec &url) {
return send_request("HEAD", url, string());
begin_request("HEAD", url, string(), false);
run();
return is_valid();
}
////////////////////////////////////////////////////////////////////
// Function: HTTPChannel::request_post_form
// Access: Published
// Description: Posts form data to a particular URL and retrieves the
// response, all using non-blocking I/O. See
// request_document() and post_form().
//
// It is important to note that you *must* call run()
// repeatedly after calling this method until run()
// returns false, and you may not call any other
// document posting or retrieving methods using the
// HTTPChannel object in the interim, or your form data
// may not get posted.
////////////////////////////////////////////////////////////////////
INLINE void HTTPChannel::
request_post_form(const URLSpec &url, const string &body) {
begin_request("POST", url, body, true);
}
////////////////////////////////////////////////////////////////////
// Function: HTTPChannel::request_document
// Access: Published
// Description: Begins a non-blocking request to retrieve a given
// document. This method will return immediately, even
// before a connection to the server has necessarily
// been established; you must then call run() from time
// to time until the return value of run() is false.
// Then you may check is_valid() and get_status_code()
// to determine the status of your request.
//
// If a previous request had been pending, that request
// is discarded.
////////////////////////////////////////////////////////////////////
INLINE void HTTPChannel::
request_document(const URLSpec &url) {
begin_request("GET", url, string(), true);
}
////////////////////////////////////////////////////////////////////
// Function: HTTPChannel::request_header
// Access: Published
// Description: Begins a non-blocking request to retrieve a given
// header. See request_document() and get_header().
////////////////////////////////////////////////////////////////////
INLINE void HTTPChannel::
request_header(const URLSpec &url) {
begin_request("HEAD", url, string(), true);
}
////////////////////////////////////////////////////////////////////
// Function: HTTPChannel::check_socket
// Access: Private
// Description: Checks whether the connection to the server has been
// closed after a failed read. If it has, issues a
// warning and calls free_bio().
////////////////////////////////////////////////////////////////////
INLINE void HTTPChannel::
check_socket() {
nassertv(!_source.is_null());
if ((*_source)->is_closed()) {
if (downloader_cat.is_debug()) {
downloader_cat.debug()
<< "Lost connection to server unexpectedly during read.\n";
}
free_bio();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -35,6 +35,7 @@
#include "bioStreamPtr.h"
#include "pmap.h"
#include "pointerTo.h"
#include "config_downloader.h"
#include <openssl/ssl.h>
class HTTPClient;
@ -58,11 +59,6 @@ class EXPCL_PANDAEXPRESS HTTPChannel : public VirtualFile {
private:
HTTPChannel(HTTPClient *client);
bool send_request(const string &method, const URLSpec &url,
const string &body);
bool send_request(const string &header, const string &body,
bool allow_reconnect);
public:
virtual ~HTTPChannel();
@ -96,26 +92,48 @@ PUBLISHED:
INLINE bool get_document(const URLSpec &url);
INLINE bool get_header(const URLSpec &url);
INLINE void request_post_form(const URLSpec &url, const string &body);
INLINE void request_document(const URLSpec &url);
INLINE void request_header(const URLSpec &url);
bool run();
ISocketStream *read_body();
private:
bool establish_connection();
bool establish_http();
bool establish_https();
bool establish_http_proxy();
bool establish_https_proxy();
bool run_connecting();
bool run_proxy_ready();
bool run_proxy_request_sent();
bool run_proxy_reading_header();
bool run_setup_ssl();
bool run_ssl_handshake();
bool run_ready();
bool run_request_sent();
bool run_reading_header();
bool run_read_header();
bool run_begin_body();
bool run_reading_body();
bool run_read_body();
bool run_read_trailer();
bool make_https_connection();
void begin_request(const string &method, const URLSpec &url,
const string &body, bool nonblocking);
bool http_getline(string &str);
bool http_send(const string &str);
bool parse_http_response(const string &line);
bool parse_http_header();
INLINE void check_socket();
bool verify_server(X509_NAME *subject) const;
static string get_x509_name_component(X509_NAME *name, int nid);
static bool x509_name_subset(X509_NAME *name_a, X509_NAME *name_b);
void make_header(string &header, const string &method,
const URLSpec &url, const string &body);
void make_header();
void make_proxy_request_text();
void make_request_text(const string &authorization);
void set_url(const URLSpec &url);
void issue_request(const string &header, const string &body);
void read_http_response();
void store_header_field(const string &field_name, const string &field_value);
bool get_authorization(string &authorization,
const string &authenticate_request,
@ -129,7 +147,6 @@ private:
static void show_send(const string &message);
#endif
bool prepare_for_next(bool allow_reconnect);
void free_bio();
HTTPClient *_client;
@ -137,18 +154,13 @@ private:
PT(BioPtr) _bio;
PT(BioStreamPtr) _source;
bool _persistent_connection;
bool _nonblocking;
URLSpec _url;
string _method;
string _header;
string _body;
enum State {
S_new,
S_read_header,
S_started_body,
S_read_body,
S_read_trailer
};
State _state;
int _read_index;
HTTPClient::HTTPVersion _http_version;
@ -163,6 +175,45 @@ private:
size_t _file_size;
// These members are used to maintain the current state while
// communicating with the server. We need to store everything in
// the class object instead of using local variables because in the
// case of nonblocking I/O we have to be able to return to the
// caller after any I/O operation and resume later where we left
// off.
enum State {
S_new,
S_connecting,
S_proxy_ready,
S_proxy_request_sent,
S_proxy_reading_header,
S_setup_ssl,
S_ssl_handshake,
S_ready,
S_request_sent,
S_reading_header,
S_read_header,
S_begin_body,
S_reading_body,
S_read_body,
S_read_trailer,
S_failure
};
State _state;
State _done_state;
string _proxy_header;
string _proxy_request_text;
bool _proxy_tunnel;
string _request_text;
string _working_getline;
size_t _sent_so_far;
string _current_field_name;
string _current_field_value;
ISocketStream *_body_stream;
BIO *_sbio;
pset<URLSpec> _redirect_trail;
int _last_status_code;
typedef pmap<string, string> Tokens;
typedef pmap<string, Tokens> AuthenticationSchemes;
static void parse_authentication_schemes(AuthenticationSchemes &schemes,

View File

@ -111,6 +111,7 @@ underflow() {
if (read_count != num_bytes) {
// Oops, we didn't read what we thought we would.
if (read_count == 0) {
gbump(num_bytes);
return EOF;
}
@ -133,33 +134,24 @@ underflow() {
////////////////////////////////////////////////////////////////////
size_t IdentityStreamBuf::
read_chars(char *start, size_t length) {
size_t read_count = 0;
if (!_has_content_length) {
// If we have no restrictions on content length, read till end of
// file.
(*_source)->read(start, length);
length = (*_source)->gcount();
if (length == 0) {
// End of file; we're done.
if (_doc != (HTTPChannel *)NULL && _read_index == _doc->_read_index) {
// An IdentityStreamBuf doesn't have a trailer, so we've already
// "read" it.
_doc->_state = HTTPChannel::S_read_trailer;
}
}
read_count = (*_source)->gcount();
} else {
if (_bytes_remaining == 0) {
return 0;
}
// Extract some of the bytes remaining in the chunk.
length = min(length, _bytes_remaining);
(*_source)->read(start, length);
length = (*_source)->gcount();
_bytes_remaining -= length;
if (_bytes_remaining != 0) {
length = min(length, _bytes_remaining);
(*_source)->read(start, length);
read_count = (*_source)->gcount();
_bytes_remaining -= read_count;
}
if (_bytes_remaining == 0) {
// We're done.
if (_doc != (HTTPChannel *)NULL && _read_index == _doc->_read_index) {
@ -169,8 +161,20 @@ read_chars(char *start, size_t length) {
}
}
}
if (read_count == 0) {
if ((*_source)->is_closed()) {
// socket closed; we're done.
if (_doc != (HTTPChannel *)NULL && _read_index == _doc->_read_index) {
// An IdentityStreamBuf doesn't have a trailer, so we've already
// "read" it.
_doc->_state = HTTPChannel::S_read_trailer;
}
}
return 0;
}
return length;
return read_count;
}
#endif // HAVE_SSL