From 4d8da3cc5a177dac9b666b8115c7ee8da4c52203 Mon Sep 17 00:00:00 2001 From: Mike Goslin Date: Thu, 11 Jan 2001 20:12:51 +0000 Subject: [PATCH] *** empty log message *** --- panda/src/downloader/decompressor.cxx | 109 ++++++++- panda/src/downloader/decompressor.h | 5 + panda/src/downloader/downloadDb.cxx | 32 ++- panda/src/downloader/downloadDb.h | 8 +- panda/src/downloader/downloader.cxx | 224 ++++++++++++++++++ panda/src/downloader/downloader.h | 7 + panda/src/downloader/zcompressor.cxx | 2 +- panda/src/downloader/zcompressor.h | 2 +- panda/src/downloadertools/Sources.pp | 2 + panda/src/downloadertools/test_downloader.cxx | 28 ++- panda/src/express/buffer.I | 9 + panda/src/express/buffer.h | 12 + 12 files changed, 412 insertions(+), 28 deletions(-) diff --git a/panda/src/downloader/decompressor.cxx b/panda/src/downloader/decompressor.cxx index 8997a2367e..9aa99d4ab5 100644 --- a/panda/src/downloader/decompressor.cxx +++ b/panda/src/downloader/decompressor.cxx @@ -107,7 +107,7 @@ initiate(Filename &source_file, Filename &dest_file) { if (_initiated == true) { downloader_cat.error() - << "Decompressor::run() - Decompression has already been initiated" + << "Decompressor::initiate() - Decompression has already been initiated" << endl; return EU_error_abort; } @@ -117,7 +117,7 @@ initiate(Filename &source_file, Filename &dest_file) { _source_file.set_binary(); if (!_source_file.open_read(_read_stream)) { downloader_cat.error() - << "Decompressor::decompress() - Error opening source file: " + << "Decompressor::initiate() - Error opening source file: " << _source_file << " : " << strerror(errno) << endl; return get_write_error(); } @@ -127,7 +127,7 @@ initiate(Filename &source_file, Filename &dest_file) { _source_file_length = _read_stream.tellg(); if (_source_file_length == 0) { downloader_cat.warning() - << "Decompressor::decompress() - Zero length file: " + << "Decompressor::initiate() - Zero length file: " << source_file << " : " << strerror(errno) << endl; return get_write_error(); } @@ -137,7 +137,7 @@ initiate(Filename &source_file, Filename &dest_file) { dest_file.set_binary(); if (!dest_file.open_write(_write_stream)) { downloader_cat.error() - << "Decompressor::decompress() - Error opening dest file: " + << "Decompressor::initiate() - Error opening dest file: " << source_file << " : " << strerror(errno) << endl; return get_write_error(); } @@ -150,6 +150,49 @@ initiate(Filename &source_file, Filename &dest_file) { _source_buffer_length = 0; _initiated = true; _decompressor = new ZDecompressor(); + _decompress_to_ram = false; + return EU_success; +} + +//////////////////////////////////////////////////////////////////// +// Function: Decompressor::initiate +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +int Decompressor:: +initiate(Ramfile &source_file) { + + if (_initiated == true) { + downloader_cat.error() + << "Decompressor::initiate() - Decompression has already been initiated" + << endl; + return EU_error_abort; + } + + // Open source file + _read_string_stream = new istringstream(source_file._data); + + // Determine source file length + _source_file_length = source_file._data.length(); + if (_source_file_length == 0) { + downloader_cat.warning() + << "Decompressor::initiate() - Zero length file: " + << strerror(errno) << endl; + return get_write_error(); + } + + // Open destination file + _write_string_stream = new ostringstream(); + + // Read from the source file into the first half of the buffer, + // decompress into the second half of the buffer, write the second + // half of the buffer to disk, and repeat. + _total_bytes_read = 0; + _read_all_input = false; + _source_buffer_length = 0; + _initiated = true; + _decompressor = new ZDecompressor(); + _decompress_to_ram = true; return EU_success; } @@ -172,7 +215,8 @@ cleanup(void) { _decompressor = NULL; _read_stream.close(); _write_stream.close(); - _source_file.unlink(); + if (_decompress_to_ram == false) + _source_file.unlink(); } //////////////////////////////////////////////////////////////////// @@ -191,12 +235,22 @@ run(void) { // See if there is anything left in the source file if (_read_all_input == false) { - _read_stream.read(_buffer->_buffer, _half_buffer_length); - _source_buffer_length = _read_stream.gcount(); - _total_bytes_read += _source_buffer_length; - if (_read_stream.eof()) { - nassertr(_total_bytes_read == _source_file_length, false); - _read_all_input = true; + if (_decompress_to_ram == false) { + _read_stream.read(_buffer->_buffer, _half_buffer_length); + _source_buffer_length = _read_stream.gcount(); + _total_bytes_read += _source_buffer_length; + if (_read_stream.eof()) { + nassertr(_total_bytes_read == _source_file_length, false); + _read_all_input = true; + } + } else { + _read_string_stream->read(_buffer->_buffer, _half_buffer_length); + _source_buffer_length = _read_string_stream->gcount(); + _total_bytes_read += _source_buffer_length; + if (_read_string_stream->eof()) { + nassertr(_total_bytes_read == _source_file_length, false); + _read_all_input = true; + } } } @@ -209,9 +263,15 @@ run(void) { nassertr(avail_out > 0 && avail_in > 0, false); while (avail_in > 0) { - int ret = _decompressor->decompress_to_stream(next_in, avail_in, + int ret; + if (_decompress_to_ram == false) + ret = _decompressor->decompress_to_stream(next_in, avail_in, next_out, avail_out, dest_buffer, dest_buffer_length, _write_stream); + else + ret = _decompressor->decompress_to_stream(next_in, avail_in, + next_out, avail_out, dest_buffer, + dest_buffer_length, *_write_string_stream); if (ret == ZCompressorBase::S_error) return EU_error_zlib; if ((int)_decompressor->get_total_in() == _source_file_length && @@ -243,3 +303,28 @@ decompress(Filename &source_file) { } return false; } + +//////////////////////////////////////////////////////////////////// +// Function: Decompressor::decompress +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +bool Decompressor:: +decompress(Ramfile &source_file) { + int ret = initiate(source_file); + if (ret < 0) + return false; + for (;;) { + ret = run(); + if (ret == EU_success) { + source_file._data = _write_string_stream->str(); + delete _read_string_stream; + _read_string_stream = NULL; + delete _write_string_stream; + _write_string_stream = NULL; + return true; + } else if (ret < 0) + return false; + } + return false; +} diff --git a/panda/src/downloader/decompressor.h b/panda/src/downloader/decompressor.h index e5c8c36985..331a07c07e 100644 --- a/panda/src/downloader/decompressor.h +++ b/panda/src/downloader/decompressor.h @@ -27,9 +27,11 @@ PUBLISHED: int initiate(Filename &source_file); int initiate(Filename &source_file, Filename &dest_file); + int initiate(Ramfile &source_file); int run(void); bool decompress(Filename &source_file); + bool decompress(Ramfile &source_file); INLINE float get_progress(void) const; @@ -45,12 +47,15 @@ private: Filename _source_file; ifstream _read_stream; ofstream _write_stream; + istringstream *_read_string_stream; + ostringstream *_write_string_stream; int _source_file_length; int _total_bytes_read; bool _read_all_input; bool _handled_all_input; int _source_buffer_length; ZDecompressor *_decompressor; + bool _decompress_to_ram; }; #include "decompressor.I" diff --git a/panda/src/downloader/downloadDb.cxx b/panda/src/downloader/downloadDb.cxx index 0eb424a66d..bed5b63b1e 100644 --- a/panda/src/downloader/downloadDb.cxx +++ b/panda/src/downloader/downloadDb.cxx @@ -21,12 +21,10 @@ PN_uint32 DownloadDb::_magic_number = 0xfeedfeed; // Description: Create a download db with these client and server dbs //////////////////////////////////////////////////////////////////// DownloadDb:: -DownloadDb(Filename &server_file, Filename &client_file) { +DownloadDb(Ramfile &server_file, Filename &client_file) { _client_db = read_db(client_file); _client_db._filename = client_file; _server_db = read_db(server_file); - _server_db._filename = server_file; - } //////////////////////////////////////////////////////////////////// @@ -209,6 +207,30 @@ read_db(Filename &file) { return db; } +//////////////////////////////////////////////////////////////////// +// Function: DownloadDb:: +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +DownloadDb::Db DownloadDb:: +read_db(Ramfile &file) { + // Open the multifile for reading + istringstream read_stream(file._data); + Db db; + + if (!db.read(read_stream)) { + downloader_cat.error() + << "DownloadDb::read() - Read failed" << endl; + return db; + } + if (!read_version_map(read_stream)) { + downloader_cat.error() + << "DownloadDb::read() - read_version_map() failed" << endl; + } + + return db; +} + //////////////////////////////////////////////////////////////////// // Function: DownloadDb:: // Access: Public @@ -630,7 +652,7 @@ parse_fr(uchar *start, int size) { // Description: //////////////////////////////////////////////////////////////////// bool DownloadDb::Db:: -read(ifstream &read_stream) { +read(istream &read_stream) { // Make a little buffer to read the header into uchar *header_buf = new uchar[_header_length]; @@ -986,7 +1008,7 @@ write_version_map(ofstream &write_stream) { // Description: //////////////////////////////////////////////////////////////////// bool DownloadDb:: -read_version_map(ifstream &read_stream) { +read_version_map(istream &read_stream) { _master_datagram.clear(); char *buffer = new char[sizeof(PN_uint64)]; read_stream.read(buffer, sizeof(PN_int32)); diff --git a/panda/src/downloader/downloadDb.h b/panda/src/downloader/downloadDb.h index 632d51f432..8765d960e9 100644 --- a/panda/src/downloader/downloadDb.h +++ b/panda/src/downloader/downloadDb.h @@ -20,6 +20,7 @@ #include #include "hashVal.h" +#include /* ////////////////////////////////////////////////// @@ -58,7 +59,7 @@ PUBLISHED: }; DownloadDb(void); - DownloadDb(Filename &server_file, Filename &client_file); + DownloadDb(Ramfile &server_file, Filename &client_file); ~DownloadDb(void); void output(ostream &out) const; @@ -155,7 +156,7 @@ public: int parse_record_header(uchar *start, int size); PT(MultifileRecord) parse_mfr(uchar *start, int size); PT(FileRecord) parse_fr(uchar *start, int size); - bool read(ifstream &read_stream); + bool read(istream &read_stream); bool write(ofstream &write_stream); Filename _filename; MultifileRecords _mfile_records; @@ -170,6 +171,7 @@ public: PUBLISHED: Db read_db(Filename &file); + Db read_db(Ramfile &file); bool write_db(Filename &file, Db db); public: @@ -191,7 +193,7 @@ PUBLISHED: protected: void write_version_map(ofstream &write_stream); - bool read_version_map(ifstream &read_stream); + bool read_version_map(istream &read_stream); VersionMap _versions; Datagram _master_datagram; }; diff --git a/panda/src/downloader/downloader.cxx b/panda/src/downloader/downloader.cxx index ab80605981..c782ba0cde 100644 --- a/panda/src/downloader/downloader.cxx +++ b/panda/src/downloader/downloader.cxx @@ -349,6 +349,55 @@ initiate(const string &file_name, Filename file_dest, _got_any_data = false; _initiated = true; _ever_initiated = true; + _download_to_ram = false; + return EU_success; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::initiate +// Access: Published +// Description: Initiate the download of a file from a server. +//////////////////////////////////////////////////////////////////// +int Downloader:: +initiate(const string &file_name) { + if (_initiated == true) { + downloader_cat.error() + << "Downloader::initiate() - Download has already been initiated" + << endl; + return EU_error_abort; + } + + // Connect to the server + int connect_ret = connect_to_server(); + if (connect_ret < 0) + return connect_ret; + + // Send an HTTP request for the file to the server + string request = "GET "; + request += file_name; + request += " HTTP/1.1\012Host: "; + request += _server_name; + request += "\012Connection: close"; + request += "\012\012"; + int outlen = request.size(); + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::initiate() - Sending request:\n" << request << endl; + int send_ret = safe_send(_socket, request.c_str(), outlen, + (long)downloader_timeout); + if (send_ret < 0) + return send_ret; + + // Create a download status to maintain download progress information + _current_status = new DownloadStatus(_buffer->_buffer, 0, 0, 0, false); + + _tfirst = 0.0; + _tlast = 0.0; + _got_any_data = false; + _initiated = true; + _ever_initiated = true; + _download_to_ram = true; + _dest_string_stream = new ostringstream(); return EU_success; } @@ -395,6 +444,9 @@ run(void) { if (connect_ret < 0) return connect_ret; + if (_download_to_ram == true) + return run_to_ram(); + int ret = EU_ok; int write_ret; double t0 = _clock.get_real_time(); @@ -508,6 +560,124 @@ run(void) { return ret; } +//////////////////////////////////////////////////////////////////// +// Function: Downloader::run_to_ram +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +int Downloader:: +run_to_ram(void) { + int ret = EU_ok; + int write_ret; + + double t0 = _clock.get_real_time(); + if (_tfirst == 0.0) { + _tfirst = t0; + } + if (t0 - _tlast < _frequency) + return EU_ok; + + // Recompute the buffer size if necessary + if (_recompute_buffer == true) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - Recomputing the buffer" << endl; + + // Flush the current buffer if it holds any data + if (_current_status->_bytes_in_buffer > 0) { + write_ret = write_to_ram(_current_status); + if (write_ret < 0) + return write_ret; + } + + // Allocate a new buffer + _buffer.clear(); + _receive_size = (ulong)(_frequency * _byte_rate); + _disk_buffer_size = _receive_size * _disk_write_frequency; + _buffer = new Buffer(_disk_buffer_size); + _current_status->_buffer = _buffer->_buffer; + _current_status->reset(); + // Reset the flag + _recompute_buffer = false; + // Reset the statistics + _tfirst = t0; + _current_status->_total_bytes = 0; + + } else if (_current_status->_bytes_in_buffer + _receive_size > + _disk_buffer_size) { + + // Flush the current buffer if the next request would overflow it + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - Flushing buffer" << endl; + write_ret = write_to_ram(_current_status); + if (write_ret < 0) + return write_ret; + } + + // Attempt to receive the bytes from the socket + int fret; + // Handle the case of a fast connection + if (_receive_size > MAX_RECEIVE_BYTES) { + int repeat = (int)(_receive_size / MAX_RECEIVE_BYTES); + int remain = (int)fmod((double)_receive_size, (double)MAX_RECEIVE_BYTES); + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - fast connection - repeat: " << repeat + << " remain: " << remain << endl; + // Make multiple requests at once but do not exceed MAX_RECEIVE_BYTES + // for any single request + for (int i = 0; i <= repeat; i++) { + if (i < repeat) + fret = fast_receive(_socket, _current_status, MAX_RECEIVE_BYTES); + else if (remain > 0) + fret = fast_receive(_socket, _current_status, remain); + if (fret == EU_eof || fret < 0) { + break; + } else if (fret == EU_success) { + _got_any_data = true; + } + } + } else { // Handle the normal speed connection case + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - normal connection" << endl; + fret = fast_receive(_socket, _current_status, _receive_size); + } + _tlast = _clock.get_real_time(); + + // Check for end of file + if (fret == EU_eof) { + if (_got_any_data == true) { + if (_current_status->_bytes_in_buffer > 0) { + write_ret = write_to_ram(_current_status); + if (write_ret < 0) + return write_ret; + } + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - Got eof" << endl; + cleanup(); + return EU_success; + } else { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - Got 0 bytes" << endl; + return ret; + } + } else if (fret == EU_network_no_data) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run_to_ram() - No data" << endl; + return ret; + } else if (fret < 0) { + return fret; + } + + _got_any_data = true; + return ret; +} + //////////////////////////////////////////////////////////////////// // Function: Downloader::parse_http_response // Access: Private @@ -710,6 +880,60 @@ write_to_disk(DownloadStatus *status) { return EU_success; } +//////////////////////////////////////////////////////////////////// +// Function: Downloader::write_to_ram +// Access: Private +// Description: Writes a download to memory. If there is a header, +// the pointer and size are adjusted so the header +// is excluded. Function returns false on error +// condition. +//////////////////////////////////////////////////////////////////// +int Downloader:: +write_to_ram(DownloadStatus *status) { + nassertr(status != NULL, EU_error_abort); + + // Ensure the header has been parsed successfully first + int parse_ret = parse_header(status); + if (parse_ret < 0) + return parse_ret; + + if (status->_header_is_complete == false) { + downloader_cat.error() + << "Downloader::write_to_ram() - Incomplete HTTP header - " + << "(or header was larger than download buffer) - " + << "try increasing download-buffer-size" << endl; + return EU_error_abort; + } + + // Write what we have so far to memory + if (status->_bytes_in_buffer > 0) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::write_to_ram() - Writing " + << status->_bytes_in_buffer << " to memory" << endl; + + _dest_string_stream->write(status->_start, status->_bytes_in_buffer); + status->_total_bytes_written += status->_bytes_in_buffer; + } + + status->reset(); + + return EU_success; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::get_ramfile +// Access: Published +// Description: +//////////////////////////////////////////////////////////////////// +bool Downloader:: +get_ramfile(Ramfile &rfile) { + rfile._data = _dest_string_stream->str(); + delete _dest_string_stream; + _dest_string_stream = NULL; + return true; +} + //////////////////////////////////////////////////////////////////// // Function: Downloader::DownloadStatus::constructor // Access: Private diff --git a/panda/src/downloader/downloader.h b/panda/src/downloader/downloader.h index 32b43b6c0b..c348dc0a53 100644 --- a/panda/src/downloader/downloader.h +++ b/panda/src/downloader/downloader.h @@ -41,8 +41,11 @@ PUBLISHED: int initiate(const string &file_name, Filename file_dest, int first_byte, int last_byte, int total_bytes, bool partial_content = true); + int initiate(const string &file_name); int run(void); + bool get_ramfile(Ramfile &rfile); + INLINE void set_frequency(float frequency); INLINE float get_frequency(void) const; INLINE void set_byte_rate(float bytes); @@ -82,6 +85,8 @@ private: int parse_http_response(const string &resp); int parse_header(DownloadStatus *status); int write_to_disk(DownloadStatus *status); + int run_to_ram(void); + int write_to_ram(DownloadStatus *status); void cleanup(void); @@ -101,11 +106,13 @@ private: ulong _receive_size; int _disk_buffer_size; ofstream _dest_stream; + ostringstream *_dest_string_stream; bool _recompute_buffer; DownloadStatus *_current_status; bool _got_any_data; int _total_bytes_written; + bool _download_to_ram; double _tlast; double _tfirst; diff --git a/panda/src/downloader/zcompressor.cxx b/panda/src/downloader/zcompressor.cxx index 27ec93d9b4..fe3199df65 100644 --- a/panda/src/downloader/zcompressor.cxx +++ b/panda/src/downloader/zcompressor.cxx @@ -142,7 +142,7 @@ decompress(char *&next_in, int &avail_in, char *&next_out, int &avail_out, int ZDecompressor:: decompress_to_stream(char *&next_in, int &avail_in, char *&next_out, int &avail_out, char *out_buffer, - int out_buffer_length, ofstream &write_stream, + int out_buffer_length, ostream &write_stream, bool finish) { int ret = decompress(next_in, avail_in, next_out, avail_out, finish); if (ret == S_error) diff --git a/panda/src/downloader/zcompressor.h b/panda/src/downloader/zcompressor.h index 8c6d6309f5..dd3ca7f525 100644 --- a/panda/src/downloader/zcompressor.h +++ b/panda/src/downloader/zcompressor.h @@ -70,7 +70,7 @@ public: int &avail_out, bool finish = false); int decompress_to_stream(char *&next_in, int &avail_in, char *&next_out, int &avail_out, char *out_buffer, int out_buffer_length, - ofstream &write_stream, bool finish = false); + ostream &write_stream, bool finish = false); }; #include "zcompressor.I" diff --git a/panda/src/downloadertools/Sources.pp b/panda/src/downloadertools/Sources.pp index 5e51634ae0..9974745b3f 100644 --- a/panda/src/downloadertools/Sources.pp +++ b/panda/src/downloadertools/Sources.pp @@ -84,6 +84,8 @@ #begin bin_target #define TARGET test_downloader + #define TARGET_IF_ZLIB yes + #define USE_ZLIB yes #define SOURCES \ test_downloader.cxx diff --git a/panda/src/downloadertools/test_downloader.cxx b/panda/src/downloadertools/test_downloader.cxx index 8324976047..2229de18c7 100644 --- a/panda/src/downloadertools/test_downloader.cxx +++ b/panda/src/downloadertools/test_downloader.cxx @@ -1,33 +1,40 @@ #include #include #include +#include int main(int argc, char *argv[]) { - if (argc < 4) { - cerr << "Usage: test_downloader " + //if (argc < 4) { + if (argc < 3) { + //cerr << "Usage: test_downloader " + cerr << "Usage: test_downloader " << endl; return 0; } string server_name = argv[1]; Filename src_file = argv[2]; - Filename dest_file = argv[3]; + //Filename dest_file = argv[3]; Downloader dl; if (dl.connect_to_server(server_name) == false) return 0; - int ret = dl.initiate(src_file, dest_file); + //int ret = dl.initiate(src_file, dest_file); + int ret = dl.initiate(src_file); if (ret < 0) return 0; - for (;;) { + //for (;;) { + bool done = false; + while (!done) { ret = dl.run(); if (ret == EU_success) { cerr << "bytes per second: " << dl.get_bytes_per_second() << endl; - return 1; + //return 1; + done = true; } else if (ret == EU_write) { cerr << "bytes per second: " << dl.get_bytes_per_second() << endl; } else if (ret < 0) { @@ -35,5 +42,14 @@ main(int argc, char *argv[]) { return 0; } } + + cerr << "download to memory complete" << endl; + Ramfile rfile; + dl.get_ramfile(rfile); + cerr << "ram file length: " << rfile._data.length() << endl; + Decompressor dc; + dc.decompress(rfile); + cerr << "ram file length: " << rfile._data.length() << endl; + return 0; } diff --git a/panda/src/express/buffer.I b/panda/src/express/buffer.I index 93ed07d0b7..eada2d41eb 100644 --- a/panda/src/express/buffer.I +++ b/panda/src/express/buffer.I @@ -12,3 +12,12 @@ INLINE int Buffer:: get_length(void) const { return _length; } + +//////////////////////////////////////////////////////////////////// +// Function: Ramfile::constructor +// Access: Published +// Description: +//////////////////////////////////////////////////////////////////// +INLINE Ramfile:: +Ramfile(void) { +} diff --git a/panda/src/express/buffer.h b/panda/src/express/buffer.h index 7cdcd3120d..4b1529b67b 100644 --- a/panda/src/express/buffer.h +++ b/panda/src/express/buffer.h @@ -34,6 +34,18 @@ private: int _length; }; +//////////////////////////////////////////////////////////////////// +// Class : Ramfile +// Description : +//////////////////////////////////////////////////////////////////// +class EXPCL_PANDAEXPRESS Ramfile { +PUBLISHED: + INLINE Ramfile(void); + +public: + string _data; +}; + #include "buffer.I" #endif