From 0b943499d1e42a53dc464ef21dd26d2f40316e6d Mon Sep 17 00:00:00 2001 From: Mike Goslin Date: Wed, 13 Dec 2000 21:19:43 +0000 Subject: [PATCH] *** empty log message *** --- panda/src/downloader/Sources.pp | 4 +- .../{download.I => asyncDownloader.I} | 115 +- panda/src/downloader/asyncDownloader.cxx | 1033 +++++++++++++++++ panda/src/downloader/asyncDownloader.h | 137 +++ panda/src/downloader/download.cxx | 653 ----------- panda/src/downloader/download.h | 119 -- panda/src/downloader/downloader.I | 93 +- panda/src/downloader/downloader.cxx | 850 ++++---------- panda/src/downloader/downloader.h | 100 +- 9 files changed, 1551 insertions(+), 1553 deletions(-) rename panda/src/downloader/{download.I => asyncDownloader.I} (53%) create mode 100644 panda/src/downloader/asyncDownloader.cxx create mode 100644 panda/src/downloader/asyncDownloader.h delete mode 100644 panda/src/downloader/download.cxx delete mode 100644 panda/src/downloader/download.h diff --git a/panda/src/downloader/Sources.pp b/panda/src/downloader/Sources.pp index ea831c0563..babe03fd6a 100644 --- a/panda/src/downloader/Sources.pp +++ b/panda/src/downloader/Sources.pp @@ -8,7 +8,6 @@ #define TARGET downloader #define SOURCES \ - asyncUtility.I asyncUtility.cxx asyncUtility.h \ config_downloader.cxx \ config_downloader.h downloadDb.I \ downloadDb.cxx downloadDb.h \ @@ -17,8 +16,7 @@ multiplexStream.I multiplexStream.cxx multiplexStream.h \ multiplexStreamBuf.I multiplexStreamBuf.cxx multiplexStreamBuf.h \ patcher.cxx \ - patcher.h \ - download.I download.cxx download.h + patcher.h #define IF_ZLIB_SOURCES \ decompressor.cxx decompressor.h zcompressor.I zcompressor.cxx \ diff --git a/panda/src/downloader/download.I b/panda/src/downloader/asyncDownloader.I similarity index 53% rename from panda/src/downloader/download.I rename to panda/src/downloader/asyncDownloader.I index 6381d73a96..850c6dd292 100644 --- a/panda/src/downloader/download.I +++ b/panda/src/downloader/asyncDownloader.I @@ -1,4 +1,4 @@ -// Filename: download.I +// Filename: downloader.I // Created by: mike (09Jan97) // //////////////////////////////////////////////////////////////////// @@ -6,97 +6,92 @@ #include "config_downloader.h" //////////////////////////////////////////////////////////////////// -// Function: Download::set_frequency -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -INLINE void Download:: -set_frequency(float frequency) { - nassertv(frequency > 0.0); - if (_frequency != frequency) { - _frequency = frequency; - _recompute_buffer = true; - } -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::get_frequency -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -INLINE float Download:: -get_frequency(void) const { - return _frequency; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::set_byte_rate +// Function: Downloader::set_byte_rate // Access: Public // Description: Note: modem speeds are reported in bits, so you // need to convert! //////////////////////////////////////////////////////////////////// -INLINE void Download:: +INLINE void Downloader:: set_byte_rate(float bytes) { nassertv(bytes > 0.0); - if (_byte_rate != bytes) { - _byte_rate = bytes; - _recompute_buffer = true; - } + if (bytes == _byte_rate) + return; +#ifdef HAVE_IPC + _buffer_lock.lock(); +#endif + + _new_byte_rate = bytes; + +#ifdef HAVE_IPC + _buffer_lock.unlock(); +#endif } //////////////////////////////////////////////////////////////////// -// Function: Download::get_byte_rate +// Function: Downloader::get_byte_rate // Access: Public // Description: Returns byte rate in bytes. //////////////////////////////////////////////////////////////////// -INLINE float Download:: +INLINE float Downloader:: get_byte_rate(void) const { return _byte_rate; } //////////////////////////////////////////////////////////////////// -// Function: Download::set_disk_write_frequency +// Function: Downloader::enable_download // Access: Public // Description: //////////////////////////////////////////////////////////////////// -INLINE void Download:: -set_disk_write_frequency(int frequency) { - nassertv(frequency > 0); - if (_disk_write_frequency != frequency) { - _disk_write_frequency = frequency; - _recompute_buffer = true; - } +INLINE void Downloader:: +enable_download(bool val) { + _download_enabled = val; } //////////////////////////////////////////////////////////////////// -// Function: Download::get_disk_write_frequency +// Function: Downloader::is_download_enabled // Access: Public // Description: //////////////////////////////////////////////////////////////////// -INLINE int Download:: +INLINE bool Downloader:: +is_download_enabled(void) const { + return _download_enabled; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::set_disk_write_frequency +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +INLINE void Downloader:: +set_disk_write_frequency(int frequency) { + nassertv(frequency > 0); +#ifdef HAVE_IPC + _buffer_lock.lock(); +#endif + + _new_disk_write_frequency = frequency; + +#ifdef HAVE_IPC + _buffer_lock.unlock(); +#endif +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::get_disk_write_frequency +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +INLINE int Downloader:: get_disk_write_frequency(void) const { return _disk_write_frequency; } //////////////////////////////////////////////////////////////////// -// Function: Download::get_bytes_written +// Function: Downloader::get_last_attempt_stalled // Access: Public // Description: //////////////////////////////////////////////////////////////////// -INLINE int Download:: -get_bytes_written(void) const { - nassertr(_current_status != NULL, 0); - return _current_status->_total_bytes_written; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::get_bytes_per_second -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -INLINE float Download:: -get_bytes_per_second(void) const { - nassertr(_tlast - _tfirst > 0.0, 0.0); - nassertr(_current_status != NULL, 0.0); - return (float)((double)_current_status->_total_bytes / (_tlast - _tfirst)); +INLINE bool Downloader:: +get_last_attempt_stalled(void) const { + return _last_attempt_stalled; } diff --git a/panda/src/downloader/asyncDownloader.cxx b/panda/src/downloader/asyncDownloader.cxx new file mode 100644 index 0000000000..5ebe7abdd8 --- /dev/null +++ b/panda/src/downloader/asyncDownloader.cxx @@ -0,0 +1,1033 @@ +// Filename: downloader.cxx +// Created by: mike (09Jan97) +// +//////////////////////////////////////////////////////////////////// +// +//////////////////////////////////////////////////////////////////// +// Includes +//////////////////////////////////////////////////////////////////// +#include "asyncDownloader.h" +#include "config_downloader.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if !defined(WIN32_VC) +// #define errno wsaGetLastError() + #include + #include + #include + #include +#endif + +//////////////////////////////////////////////////////////////////// +// Defines +//////////////////////////////////////////////////////////////////// +enum send_status { + SS_error, + SS_timeout, + SS_success +}; + +enum receive_status { + RS_error, + RS_timeout, + RS_success, + RS_eof +}; + +//////////////////////////////////////////////////////////////////// +// Class : DownloaderToken +// Description : Holds a request for the downloader. +//////////////////////////////////////////////////////////////////// +class DownloaderToken : public ReferenceCount { +public: + INLINE DownloaderToken(uint id, const string &file_name, + const Filename &file_dest, const string &event_name, + int first_byte, int last_byte, int total_bytes, + bool partial_content, bool sync) : _id(id), _first_byte(first_byte), + _last_byte(last_byte), _total_bytes(total_bytes) { + _file_name = file_name; + _event_name = event_name; + _file_dest = file_dest; + _partial_content = partial_content; + _sync = sync; + } + uint _id; + string _file_name; + Filename _file_dest; + string _event_name; + int _first_byte; + int _last_byte; + int _total_bytes; + bool _partial_content; + bool _sync; +}; + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::Constructor +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +Downloader:: +Downloader(void) : AsyncUtility() { + init(); +} + +#if 0 +//////////////////////////////////////////////////////////////////// +// Function: Downloader::Constructor +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +Downloader:: +Downloader(PT(Buffer) buffer) : AsyncUtility() { + init(buffer); +} +#endif + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::init +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +void Downloader:: +init(void) { + _disk_write_frequency = downloader_disk_write_frequency; + _new_disk_write_frequency = 0; + _byte_rate = downloader_byte_rate; + _new_byte_rate = 0; + _frequency = downloader_frequency; + nassertv(_frequency > 0 && _byte_rate > 0); + _read_size = _byte_rate * _frequency; + _disk_buffer_size = _disk_write_frequency * _read_size; + _buffer = new Buffer(_disk_buffer_size); + _connected = false; + _token_board = new DownloaderTokenBoard; + _download_enabled = true; + _last_attempt_stalled = true; + // We need to flush after every write in case we're interrupted + _dest_stream.setf(ios::unitbuf, 0); + _last_attempt_stalled = false; + _current_attempt_stalled = false; + +#if defined(WIN32) + WSAData mydata; + int answer1 = WSAStartup(0x0101, &mydata); + if(answer1 != 0) { + downloader_cat.error() + << "Downloader::Downloader() - Error initializing TCP stack!" + << endl; + } +#endif +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::Destructor +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +Downloader:: +~Downloader() { + if (_connected) + disconnect_from_server(); + + destroy_thread(); + + delete _token_board; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::connect_to_server +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +bool Downloader:: +connect_to_server(const string &name, uint port) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader connecting to server: " << name + << " on port: " << port << endl; + + _server_name = name; + + _sin.sin_family = PF_INET; + _sin.sin_port = htons(port); + ulong addr = (ulong)inet_addr(name.c_str()); + struct hostent *hp = NULL; + + if (addr == INADDR_NONE) { + hp = gethostbyname(name.c_str()); + if (hp != NULL) + (void)memcpy(&_sin.sin_addr, hp->h_addr, (uint)hp->h_length); + else { + downloader_cat.error() + << "Downloader::connect_to_server() - gethostbyname() failed: " + << strerror(errno) << endl; + return false; + } + } else + (void)memcpy(&_sin.sin_addr, &addr, sizeof(addr)); + + return connect_to_server(); +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::connect_to_server +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +bool Downloader:: +connect_to_server(void) { + if (_connected == true) + return true; + + _socket = 0xffffffff; + _socket = socket(PF_INET, SOCK_STREAM, 0); + if (_socket == (int)0xffffffff) { + downloader_cat.error() + << "Downloader::connect_to_server() - socket failed: " + << strerror(errno) << endl; + return false; + } + + _connected = true; + + if (connect(_socket, (struct sockaddr *)&_sin, sizeof(_sin)) < 0) { + downloader_cat.error() + << "Downloader::connect_to_server() - connect() failed: " + << strerror(errno) << endl; + disconnect_from_server(); + _connected = false; + } + + return _connected; +} + +/////////////////////////////////////////////////////////////////// +// Function: Downloader::disconnect_from_server +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +void Downloader:: +disconnect_from_server(void) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader disconnecting from server..." << endl; + +#if defined(WIN32) + (void)closesocket(_socket); +#else + (void)close(_socket); +#endif + + _connected = false; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::request_sync_download +// Access: Public +// Description: Requests the synchronous download of a complete file. +//////////////////////////////////////////////////////////////////// +int Downloader:: +request_sync_download(const string &file_name, const Filename &file_dest, + const string &event_name) { + return request_download(file_name, file_dest, event_name, true); +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::request_sync_download +// Access: Public +// Description: Requests the synchronous download of a complete file. +//////////////////////////////////////////////////////////////////// +int Downloader:: +request_sync_download(const string &file_name, const Filename &file_dest, + const string &event_name, int first_byte, + int last_byte, int total_bytes, bool partial_content) { + return request_download(file_name, file_dest, event_name, first_byte, + last_byte, total_bytes, partial_content, true); +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::request_download +// Access: Public +// Description: Requests the download of a complete file. +//////////////////////////////////////////////////////////////////// +int Downloader:: +request_download(const string &file_name, const Filename &file_dest, + const string &event_name, bool sync) { + return request_download(file_name, file_dest, event_name, 0, 0, 0, + false, sync); +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::request_download +// Access: Public +// Description: Requests an asynchronous load of a file. The request +// will be queued and served by the asynchronous thread. +// If event_name is nonempty, it is the name of the +// event that will be thrown (with the uint id as its +// single parameter) when the loading is completed later. +// +// The return value is an integer which can be used to +// identify this particular request later to +// fetch_load(), or 0 if there has been an error. +// +// Can be used to request a partial download of a file. +//////////////////////////////////////////////////////////////////// +int Downloader:: +request_download(const string &file_name, const Filename &file_dest, + const string &event_name, int first_byte, + int last_byte, int total_bytes, + bool partial_content, bool sync) { + + nassertr(first_byte <= last_byte && last_byte <= total_bytes, 0); + + PT(DownloaderToken) tok; + if (_threads_enabled) { + + // Make sure we actually are threaded + if (!_threaded) { + downloader_cat.info() + << "Downloader::request_download() - create_thread() was " + << "never called! Calling it now..." << endl; + create_thread(); + } + + // We need to grab the lock in order to signal the condition variable +#ifdef HAVE_IPC + _lock.lock(); +#endif + + if (_token_board->_waiting.is_full()) { + downloader_cat.error() + << "Downloader::request_download() - Too many pending requests\n"; + return 0; + } + + if (downloader_cat.is_debug()) { + downloader_cat.debug() + << "Download requested for file: " << file_name << "\n"; + } + + tok = new DownloaderToken(_next_token++, file_name, file_dest, + event_name, first_byte, last_byte, total_bytes, + partial_content, sync); + _token_board->_waiting.insert(tok); + +#ifdef HAVE_IPC + _request_cond->signal(); + _lock.unlock(); +#endif + + } else { + // If we're not running asynchronously, process the load request + // directly now. + if (_token_board->_waiting.is_full()) { + downloader_cat.error() + << "Downloader::request_download() - Too many pending requests\n"; + return 0; + } + if (downloader_cat.is_debug()) { + downloader_cat.debug() + << "Load requested for file: " << file_name << "\n"; + } + + tok = new DownloaderToken(_next_token++, file_name, file_dest, + event_name, first_byte, last_byte, total_bytes, + partial_content, sync); + _token_board->_waiting.insert(tok); + process_request(); + } + + return tok->_id; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::process_request +// Access: Private +// Description: Serves any requests on the token board, moving them +// to the done queue. +//////////////////////////////////////////////////////////////////// +bool Downloader:: +process_request() { + if (_shutdown) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader shutting down...\n"; + return false; + } + + // If there is actually a request token - process it + while (!_token_board->_waiting.is_empty()) { + PT(DownloaderToken) tok = _token_board->_waiting.extract(); + int ret = download(tok->_file_name, tok->_file_dest, tok->_event_name, + tok->_first_byte, tok->_last_byte, tok->_total_bytes, + tok->_partial_content, tok->_sync, tok->_id); + nassertr(tok->_event_name.empty() == false, false); + PT_Event return_event = new Event(tok->_event_name); + return_event->add_parameter(EventParameter((int)tok->_id)); + if (ret == DS_success) { + _token_board->_done.insert(tok); + return_event->add_parameter(EventParameter(DS_success)); + + // Throw a "done" event now. + if (!tok->_event_name.empty()) { + PT_Event done = new Event(tok->_event_name); + done->add_parameter(EventParameter((int)tok->_id)); + throw_event(done); + } + + if (downloader_cat.is_debug()) { + downloader_cat.debug() + << "Downloader::process_request() - downloading complete for " + << tok->_file_name << "\n"; + } + } else { + return_event->add_parameter(EventParameter(ret)); + } + throw_event(return_event); + } + + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::safe_send +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +int Downloader:: +safe_send(int socket, const char *data, int length, long timeout) { + if (length == 0) { + downloader_cat.error() + << "Downloader::safe_send() - requested 0 length send!" << endl; + return SS_error; + } + int bytes = 0; + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + fd_set wset; + FD_ZERO(&wset); + while (bytes < length) { + FD_SET(socket, &wset); + int sret = select(socket + 1, NULL, &wset, NULL, &tv); + if (sret == 0) { + downloader_cat.error() + << "Downloader::safe_send() - select timed out after: " + << timeout << " seconds" << endl; + return SS_timeout; + } else if (sret == -1) { + downloader_cat.error() + << "Downloader::safe_send() - error: " << strerror(errno) << endl; + return SS_error; + } + int ret = send(socket, data, length, 0); + if (ret > 0) + bytes += ret; + else { + downloader_cat.error() + << "Downloader::safe_send() - error: " << strerror(errno) << endl; + return SS_error; + } + } + return SS_success; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::safe_receive +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +int Downloader:: +safe_receive(int socket, DownloadStatus &status, int length, + long timeout, int &bytes) { + bytes = 0; + if (length == 0) { + downloader_cat.error() + << "Downloader::safe_receive() - requested 0 length receive!" << endl; + return RS_error; + } + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + fd_set rset; + FD_ZERO(&rset); + while (bytes < length) { + FD_SET(socket, &rset); + int sret = select(socket + 1, &rset, NULL, NULL, &tv); + if (sret == 0) { + downloader_cat.warning() + << "Downloader::safe_receive() - select timed out after: " + << timeout << " seconds" << endl; + return RS_timeout; + } else if (sret == -1) { + downloader_cat.error() + << "Downloader::safe_receive() - error: " << strerror(errno) << endl; + return RS_error; + } + int ret = recv(socket, status._next_in, length - bytes, 0); + if (ret > 0) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::safe_receive() - recv() got: " << ret << " bytes" + << endl; + bytes += ret; + status._next_in += ret; + status._bytes_in_buffer += ret; + if (bytes < length) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::safe_receive() - Download stalled" << endl; + _current_attempt_stalled = true; + } + } else if (ret == 0) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::safe_receive() - End of file" << endl; + return RS_eof; + } else { + downloader_cat.error() + << "Downloader::safe_receive() - error: " << strerror(errno) << endl; + return RS_error; + } + } + nassertr(bytes == length, RS_error); + return RS_success; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::attempt_read +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +int Downloader:: +attempt_read(int length, DownloadStatus &status, int &bytes_read) { + + bytes_read = 0; + for (int i = 0; i < downloader_timeout_retries; i++) { + + // Ensure we have enough room in the buffer to download length bytes + // If we don't have enough room, write the buffer to disk + if (status._bytes_in_buffer + length > _disk_buffer_size) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::attempt_read() - Flushing buffer" << endl; + + if (write_to_disk(status) == false) + return RS_error; + } + + // Make the request for length bytes + int bytes; + int ans = safe_receive(_socket, status, length, + (long)downloader_timeout, bytes); + bytes_read += bytes; + + switch (ans) { + case RS_error: + case RS_eof: + return ans; + case RS_timeout: + // Try again + break; + case RS_success: + nassertr(bytes == length, RS_error); + return RS_success; + default: + downloader_cat.error() + << "Downloader::attempt_read() - unknown return condition " + << "from safe_receive() : " << ans << endl; + return RS_error; + } + } + + // We timed out on retries consecutive attempts - this is considered + // a true timeout + return RS_timeout; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::download +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +int Downloader:: +download(const string &file_name, Filename file_dest, + const string &event_name, int first_byte, int last_byte, + int total_bytes, bool partial_content, bool sync, uint id) { + + if (_download_enabled == false) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - downloading is disabled" << endl; + return DS_abort; + } + + // Make sure we are still connected to the server + if (connect_to_server() == false) + return DS_abort; + + // Attempt to open the destination file + file_dest.set_binary(); + bool result; + if (partial_content == true && first_byte > 0) + result = file_dest.open_append(_dest_stream); + else + result = file_dest.open_write(_dest_stream); + if (result == false) { + downloader_cat.error() + << "Downloader::download() - Error opening file: " << file_dest + << " for writing" << endl; + return DS_abort; + } + + // Send an HTTP request for the file to the server + string request = "GET "; + request += file_name; + request += " HTTP/1.1\012Host: "; + request += _server_name; + request += "\012Connection: close"; + if (partial_content == true) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - Requesting byte range: " << first_byte + << "-" << last_byte << endl; + request += "\012Range: bytes="; + stringstream start_stream; + start_stream << first_byte << "-" << last_byte; + request += start_stream.str(); + } + request += "\012\012"; + int outlen = request.size(); + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - Sending request:\n" << request << endl; + int send_ret = safe_send(_socket, request.c_str(), outlen, + (long)downloader_timeout); + + // Handle timeouts on the send + if (send_ret == SS_timeout) { + for (int sr = 0; sr < downloader_timeout_retries; sr++) { + send_ret = safe_send(_socket, request.c_str(), outlen, + (long)downloader_timeout); + if (send_ret != SS_timeout) + break; + } + if (send_ret == SS_timeout) { + // We've really timed out - throw an event + downloader_cat.error() + << "Downloader::download() - send timed out after: " + << downloader_timeout_retries << " retries" << endl; + return DS_timeout; + } + } + + if (send_ret == SS_error) + return DS_abort; + + // Create a download status to maintain download progress information + DownloadStatus status(_buffer->_buffer, event_name, first_byte, last_byte, + total_bytes, partial_content, id); + bool got_any_data = false; + + // Loop at the requested frequency until the download completes + for (;;) { + bool resize_buffer = false; + + // Ensure that these don't change while we're computing read_size +#ifdef HAVE_IPC + _buffer_lock.lock(); +#endif + + nassertr(_frequency > 0, DS_abort); + // If byte rate has changed, recompute read size and write buffer size + if (_new_byte_rate > 0) { + _read_size = (int)ceil(_new_byte_rate * _frequency); + _byte_rate = _new_byte_rate; + _new_byte_rate = 0; + resize_buffer = true; + } + + // If the disk write frequency has changed, compute a new buffer size + if (_new_disk_write_frequency > 0) { + _disk_write_frequency = _new_disk_write_frequency; + _new_disk_write_frequency = 0; + resize_buffer = true; + } + + if (resize_buffer == true) { + // Flush the write buffer before resizing it + if (status._bytes_in_buffer > 0) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - Flushing buffer" << endl; + + if (write_to_disk(status) == false) { + downloader_cat.error() + << "Downloader::download() - failed to flush buffer during " + << "resize" << endl; + return DS_abort; + } + } + + // Resize the buffer + _disk_buffer_size = (_disk_write_frequency * _read_size); + + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - resizing disk buffer to: " + << _disk_buffer_size << endl; + _buffer.clear(); + downloader_cat.debug() + << "Downloader::download() - buffer cleared" << endl; + _buffer = new Buffer(_disk_buffer_size); + // Update the status with the new buffer + status._buffer = _buffer->_buffer; + status.reset(); + downloader_cat.debug() + << "Downloader::download() - new buffer created" << endl; + } + +#ifdef HAVE_IPC + _buffer_lock.unlock(); +#endif + + // Attempt to read + int bytes_read; + + int ret = attempt_read(_read_size, status, bytes_read); + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - stalled status: " << _current_attempt_stalled + << endl; + + _last_attempt_stalled = _current_attempt_stalled; + _current_attempt_stalled = false; + + if (bytes_read > 0) + got_any_data = true; + + switch (ret) { + case RS_error: + + downloader_cat.error() + << "Downloader::download() - Error reading from socket: " + << strerror(errno) << endl; + return DS_abort; + + case RS_timeout: + + { + // We've really timed out - throw an event + downloader_cat.error() + << "Downloader::download() - receive timed out after: " + << downloader_timeout_retries << " retries" << endl; + if (bytes_read > 0) { + if (write_to_disk(status) == false) { + downloader_cat.error() + << "Downloader::download() - write to disk failed after " + << "timeout!" << endl; + return DS_abort; + } + } + return DS_timeout; + } + + case RS_success: + + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - Got: " << bytes_read << " bytes" + << endl; + break; + + case RS_eof: + + { + // We occasionally will get 0 bytes on the first attempt - we + // don't want to treat this as end of file in any case + if (got_any_data == true) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Download for: " << file_name << " completed" << endl; + bool ret = true; + if (bytes_read > 0) + ret = write_to_disk(status); + _dest_stream.close(); + + // The "Connection: close" line tells server to close connection + // when the download is complete + _connected = false; + if (ret == false) + return DS_abort; + return DS_success; + } else { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::download() - Received 0 bytes" << endl; + } + } + break; + + default: + + downloader_cat.error() + << "Downloader::download() - Unknown return value from " + << "attempt_read() : " << ret << endl; + return DS_abort; + + } // switch(ret) + + // Sleep for the requested frequency + nap(); + + } // for (;;) + + downloader_cat.error() + << "Downloader::download() - Dropped out of for loop without returning!" + << endl; + return DS_abort; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::parse_http_response +// Access: Private +// Description: Check the HTTP response from the server +//////////////////////////////////////////////////////////////////// +bool Downloader:: +parse_http_response(const string &resp) { + size_t ws = resp.find(" ", 0); + string httpstr = resp.substr(0, ws); + if (!(httpstr == "HTTP/1.1")) { + downloader_cat.error() + << "Downloader::parse_http_response() - not HTTP/1.1 - got: " + << httpstr << endl; + return false; + } + size_t ws2 = resp.find(" ", ws); + string numstr = resp.substr(ws, ws2); + nassertr(numstr.length() > 0, false); + int num = atoi(numstr.c_str()); + switch (num) { + case 200: + case 206: + return true; + case 202: + // Accepted - server may not honor request, though + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::parse_http_response() - got a 202 Accepted - " + << "server does not guarantee to honor this request" << endl; + return true; + case 201: + case 203: + case 204: + case 205: + default: + break; + } + + downloader_cat.error() + << "Downloader::parse_http_response() - Invalid response: " + << resp << endl; + return false; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::parse_header +// Access: Private +// Description: Looks for a valid header. If it finds one, it +// calculates the header length and strips it from +// the download status structure. Function returns false +// on an error condition, otherwise true. +//////////////////////////////////////////////////////////////////// +bool Downloader:: +parse_header(DownloadStatus &status) { + + if (status._header_is_complete == true) + return true; + + if (status._bytes_in_buffer == 0) { + downloader_cat.error() + << "Downloader::parse_header() - Empty buffer!" << endl; + return false; + } + + string bufstr((char *)status._start, status._bytes_in_buffer); + size_t p = 0; + while (p < bufstr.length()) { + // Server sends out CR LF (\r\n) as newline delimiter + size_t nl = bufstr.find("\015\012", p); + if (nl == string::npos) { + downloader_cat.error() + << "Downloader::parse_header() - No newlines in buffer of " + << "length: " << status._bytes_in_buffer << endl; + return false; + } else if (p == 0 && nl == p) { + downloader_cat.error() + << "Downloader::parse_header() - Buffer begins with newline!" + << endl; + return false; + } + + string component = bufstr.substr(p, nl - p); + + // The first line of the response should say whether + // got an error or not + if (status._first_line_complete == false) { + status._first_line_complete = true; + if (parse_http_response(component) == true) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::parse_header() - Header is valid: " + << component << endl; + status._header_is_valid = true; + } else { + return false; + } + } + + // Look for content length + size_t cpos = component.find(":"); + string tline = component.substr(0, cpos); + if (status._partial_content == true && tline == "Content-Length") { + tline = component.substr(cpos + 2, string::npos); + int server_download_bytes = atoi(tline.c_str()); + int client_download_bytes = status._last_byte - status._first_byte; + if (status._first_byte == 0) + client_download_bytes += 1; + if (client_download_bytes != server_download_bytes) { + downloader_cat.error() + << "Downloader::parse_header() - server size = " + << server_download_bytes << ", client size = " + << client_download_bytes << " (" + << status._last_byte << "-" << status._first_byte << ")" << endl; + return false; + } + } + + // Two consecutive (CR LF)s indicates end of HTTP header + if (nl == p) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::parse_header() - Header is complete" << endl; + status._header_is_complete = true; + + // Strip the header out of the status buffer + int header_length = nl + 2; + status._start += header_length; + status._bytes_in_buffer -= header_length; + + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::parse_header() - Stripping out header of size: " + << header_length << endl; + + return true; + } + + p = nl + 2; + } + + if (status._header_is_complete == false) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::parse_header() - Reached end of buffer without " + << "successfully parsing the header - buffer size: " + << status._bytes_in_buffer << endl; + } + + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::write_to_disk +// Access: Private +// Description: Writes a download to disk. If there is a header, +// the pointer and size are adjusted so the header +// is excluded. Function returns false on error +// condition. +//////////////////////////////////////////////////////////////////// +bool Downloader:: +write_to_disk(DownloadStatus &status) { + + // Ensure the header has been parsed successfully first + if (parse_header(status) == false) + return false; + + if (status._header_is_complete == false) { + downloader_cat.error() + << "Downloader::write_to_disk() - Incomplete HTTP header - " + << "(or header was larger than download buffer) - " + << "try increasing download-buffer-size" << endl; + return false; + } + + // Write what we have so far to disk + if (status._bytes_in_buffer > 0) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::write_to_disk() - Writing " + << status._bytes_in_buffer << " to disk" << endl; + + _dest_stream.write(status._start, status._bytes_in_buffer); + status._total_bytes_written += status._bytes_in_buffer; + + // Throw an event to indicate how many bytes have been written so far + if (!status._event_name.empty()) { + PT_Event write_event = new Event(status._event_name); + write_event->add_parameter(EventParameter((int)status._id)); + write_event->add_parameter(EventParameter(DS_write)); + write_event->add_parameter(EventParameter(status._total_bytes_written)); + throw_event(write_event); + } + } + + status.reset(); + + return true; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::DownloadStatus::constructor +// Access: Private +// Description: +//////////////////////////////////////////////////////////////////// +Downloader::DownloadStatus:: +DownloadStatus(char *buffer, const string &event_name, int first_byte, + int last_byte, int total_bytes, bool partial_content, uint id) { + _first_line_complete = false; + _header_is_complete = false; + _header_is_valid = false; + _buffer = buffer; + _event_name = event_name; + _first_byte = first_byte; + _last_byte = last_byte; + _total_bytes = total_bytes; + _partial_content = partial_content; + _id = id; + reset(); +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::DownloadStatus::reset +// Access: Public +// Description: Resets the status buffer for more downloading after +// a write. +//////////////////////////////////////////////////////////////////// +void Downloader::DownloadStatus:: +reset(void) { + _start = _buffer; + _next_in = _start; + _bytes_in_buffer = 0; + _total_bytes_written = 0; +} diff --git a/panda/src/downloader/asyncDownloader.h b/panda/src/downloader/asyncDownloader.h new file mode 100644 index 0000000000..4a3e2f2408 --- /dev/null +++ b/panda/src/downloader/asyncDownloader.h @@ -0,0 +1,137 @@ +// Filename: downloader.h +// Created by: mike (09Jan97) +// +//////////////////////////////////////////////////////////////////// +// +#ifndef ASYNCDOWNLOADER_H +#define ASYNCDOWNLOADER_H +// +//////////////////////////////////////////////////////////////////// +// Includes +//////////////////////////////////////////////////////////////////// +#include +#include +#include +#include +#include +#include "asyncUtility.h" + +#if defined(WIN32_VC) + #include +#else + #include + #include + #include +#endif + +class DownloaderToken; + +//////////////////////////////////////////////////////////////////// +// Class : Downloader +// Description : +//////////////////////////////////////////////////////////////////// +class EXPCL_PANDAEXPRESS Downloader : public AsyncUtility { +PUBLISHED: + enum DownloadCode { + DS_write = 2, + DS_success = 1, + DS_abort = -1, + DS_timeout = -2 + }; + Downloader(void); + //Downloader(PT(Buffer) buffer); + virtual ~Downloader(void); + + bool connect_to_server(const string &name, uint port=80); + void disconnect_from_server(void); + + int request_sync_download(const string &file_name, const Filename &file_dest, + const string &event_name); + int request_sync_download(const string &file_name, const Filename &file_dest, + const string &event_name, int first_byte, + int last_byte, int total_bytes, + bool partial_content = true); + int request_download(const string &file_name, const Filename &file_dest, + const string &event_name, bool sync = false); + int request_download(const string &file_name, const Filename &file_dest, + const string &event_name, int first_byte, + int last_byte, int total_bytes, + bool partial_content = true, bool sync = false); + + INLINE void set_byte_rate(float bytes); + INLINE float get_byte_rate(void) const; + INLINE void set_disk_write_frequency(int frequency); + INLINE int get_disk_write_frequency(void) const; + INLINE void enable_download(bool val); + INLINE bool is_download_enabled(void) const; + INLINE bool get_last_attempt_stalled(void) const; + +private: + class DownloadStatus { + public: + DownloadStatus(char *buffer, const string &event_name, int first_byte, + int last_byte, int total_bytes, bool partial_content, + uint id); + void reset(void); + + public: + bool _first_line_complete; + bool _header_is_complete; + bool _header_is_valid; + char *_start; + char *_next_in; + int _bytes_in_buffer; + string _event_name; + int _total_bytes_written; + int _first_byte; + int _last_byte; + int _total_bytes; + bool _partial_content; + uint _id; + char *_buffer; + }; + + void init(); + int download(const string &file_name, Filename file_dest, + const string &event_name, int first_byte, + int last_byte, int total_bytes, bool partial_content, + bool sync, uint id); + virtual bool process_request(void); + bool parse_header(DownloadStatus &status); + bool write_to_disk(DownloadStatus &status); + bool connect_to_server(void); + int safe_send(int socket, const char *data, int length, long timeout); + int safe_receive(int socket, DownloadStatus &status, int length, + long timeout, int &bytes); + bool parse_http_response(const string &resp); + int attempt_read(int length, DownloadStatus &status, int &bytes_read); + + typedef TokenBoard DownloaderTokenBoard; + DownloaderTokenBoard *_token_board; + + bool _connected; + +#ifdef HAVE_IPC + mutex _buffer_lock; +#endif + + int _socket; + PT(Buffer) _buffer; + int _disk_write_frequency; + int _new_disk_write_frequency; + float _byte_rate; + float _new_byte_rate; + int _read_size; + bool _download_enabled; + ofstream _dest_stream; + int _disk_buffer_size; + bool _last_attempt_stalled; + bool _current_attempt_stalled; + + string _server_name; + struct sockaddr_in _sin; +}; + +#include "asyncDownloader.I" + +#endif diff --git a/panda/src/downloader/download.cxx b/panda/src/downloader/download.cxx deleted file mode 100644 index df96fb21c3..0000000000 --- a/panda/src/downloader/download.cxx +++ /dev/null @@ -1,653 +0,0 @@ -// Filename: download.cxx -// Created by: mike (09Jan97) -// -//////////////////////////////////////////////////////////////////// -// -//////////////////////////////////////////////////////////////////// -// Includes -//////////////////////////////////////////////////////////////////// -#include "download.h" -#include "config_downloader.h" - -#include -#include -#include - -#if !defined(WIN32_VC) - #include - #include - #include - #include -#endif - -//////////////////////////////////////////////////////////////////// -// Defines -//////////////////////////////////////////////////////////////////// -enum SafeSendCode { - SS_success = 1, - SS_error = -1, - SS_timeout = -2, -}; - -enum FastReceiveCode { - FR_eof = 2, - FR_success = 1, - FR_error = -1, - FR_timeout = -2, - FR_no_data = -3, -}; - -//////////////////////////////////////////////////////////////////// -// Function: Download::Constructor -// Access: Published -// Description: -//////////////////////////////////////////////////////////////////// -Download:: -Download(void) { - _frequency = downloader_frequency; - _byte_rate = downloader_byte_rate; - _disk_write_frequency = downloader_disk_write_frequency; - nassertv(_frequency > 0 && _byte_rate > 0 && _disk_write_frequency > 0); - _receive_size = _byte_rate * _frequency; - _disk_buffer_size = _disk_write_frequency * _receive_size; - _buffer = new Buffer(_disk_buffer_size); - - _connected = false; - // We need to flush after every write in case we're interrupted - _dest_stream.setf(ios::unitbuf, 0); - _current_status = NULL; - _recompute_buffer = false; - - _tfirst = 0.0; - _tlast = 0.0; - _got_any_data = false; - -#if defined(WIN32) - WSAData mydata; - int answer1 = WSAStartup(0x0101, &mydata); - if(answer1 != 0) { - downloader_cat.error() - << "Downloader::Downloader() - Error initializing TCP stack!" - << endl; - } -#endif -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::Destructor -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -Download:: -~Download() { - if (_connected) - disconnect_from_server(); - delete _buffer; - if (_current_status != NULL) - delete _current_status; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::connect_to_server -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -bool Download:: -connect_to_server(const string &name, uint port) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download connecting to server: " << name << " on port: " - << port << endl; - - _server_name = name; - - _sin.sin_family = PF_INET; - _sin.sin_port = htons(port); - ulong addr = (ulong)inet_addr(name.c_str()); - struct hostent *hp = NULL; - - if (addr == INADDR_NONE) { - hp = gethostbyname(name.c_str()); - if (hp != NULL) - (void)memcpy(&_sin.sin_addr, hp->h_addr, (uint)hp->h_length); - else { - downloader_cat.error() - << "Downloader::connect_to_server() - gethostbyname() failed: " - << strerror(errno) << endl; - return false; - } - } else - (void)memcpy(&_sin.sin_addr, &addr, sizeof(addr)); - - return connect_to_server(); -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::connect_to_server -// Access: Private -// Description: -//////////////////////////////////////////////////////////////////// -bool Download:: -connect_to_server(void) { - if (_connected == true) - return true; - - _socket = 0xffffffff; - _socket = socket(PF_INET, SOCK_STREAM, 0); - if (_socket == (int)0xffffffff) { - downloader_cat.error() - << "Download::connect_to_server() - socket failed: " - << strerror(errno) << endl; - return false; - } - - _connected = true; - - if (connect(_socket, (struct sockaddr *)&_sin, sizeof(_sin)) < 0) { - downloader_cat.error() - << "Download::connect_to_server() - connect() failed: " - << strerror(errno) << endl; - disconnect_from_server(); - _connected = false; - } - - return _connected; -} - -/////////////////////////////////////////////////////////////////// -// Function: Download::disconnect_from_server -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -void Download:: -disconnect_from_server(void) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download disconnecting from server..." << endl; -#if defined(WIN32) - (void)closesocket(_socket); -#else - (void)close(_socket); -#endif - _connected = false; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::safe_send -// Access: Private -// Description: -//////////////////////////////////////////////////////////////////// -int Download:: -safe_send(int socket, const char *data, int length, long timeout) { - if (length == 0) { - downloader_cat.error() - << "Download::safe_send() - requested 0 length send!" << endl; - return SS_error; - } - int bytes = 0; - struct timeval tv; - tv.tv_sec = timeout; - tv.tv_usec = 0; - fd_set wset; - FD_ZERO(&wset); - while (bytes < length) { - FD_SET(socket, &wset); - int sret = select(socket + 1, NULL, &wset, NULL, &tv); - if (sret == 0) { - downloader_cat.error() - << "Download::safe_send() - select timed out after: " - << timeout << " seconds" << endl; - return SS_timeout; - } else if (sret == -1) { - downloader_cat.error() - << "Download::safe_send() - error: " << strerror(errno) << endl; - return SS_error; - } - int ret = send(socket, data, length, 0); - if (ret > 0) - bytes += ret; - else { - downloader_cat.error() - << "Download::safe_send() - error: " << strerror(errno) << endl; - return SS_error; - } - } - return SS_success; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::fast_receive -// Access: Private -// Description: -//////////////////////////////////////////////////////////////////// -int Download:: -fast_receive(int socket, DownloadStatus *status, int rec_size) { - nassertr(status != NULL, FR_error); - if (rec_size <= 0) { - downloader_cat.error() - << "Download::fast_receive() - Invalid receive size: " << rec_size - << endl; - return FR_error; - } - - // Poll the socket with select() to see if there is any data - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 0; - fd_set rset; - FD_ZERO(&rset); - FD_SET(socket, &rset); - int sret = select(socket + 1, &rset, NULL, NULL, &tv); - if (sret == 0) { - return FR_no_data; - } else if (sret == -1) { - downloader_cat.error() - << "Downloader::safe_receive() - error: " << strerror(errno) << endl; - return FR_error; - } - int ret = recv(socket, status->_next_in, rec_size, 0); - if (ret == 0) { - return FR_eof; - } else if (ret == -1) { - downloader_cat.error() - << "Download::fast_receive() - error: " << strerror(errno) << endl; - return FR_error; - } - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::fast_receive() - recv() got: " << ret << " bytes" - << endl; - status->_next_in += ret; - status->_bytes_in_buffer += ret; - status->_total_bytes += ret; - return ret; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::initiate -// Access: Published -// Description: Initiate the download of a complete file from the server. -//////////////////////////////////////////////////////////////////// -int Download:: -initiate(const string &file_name, Filename file_dest) { - return initiate(file_name, file_dest, 0, 0, 0, false); -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::initiate -// Access: Published -// Description: Initiate the download of a file from a server. -//////////////////////////////////////////////////////////////////// -int Download:: -initiate(const string &file_name, Filename file_dest, - int first_byte, int last_byte, int total_bytes, - bool partial_content) { - - // Connect to the server - if (connect_to_server() == false) - return DS_error_connect; - - // Attempt to open the destination file - file_dest.set_binary(); - bool result; - if (partial_content == true && first_byte > 0) - result = file_dest.open_append(_dest_stream); - else - result = file_dest.open_write(_dest_stream); - if (result == false) { - downloader_cat.error() - << "Downloader::download() - Error opening file: " << file_dest - << " for writing" << endl; - return DS_error_write; - } - - // Send an HTTP request for the file to the server - string request = "GET "; - request += file_name; - request += " HTTP/1.1\012Host: "; - request += _server_name; - request += "\012Connection: close"; - if (partial_content == true) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - Requesting byte range: " << first_byte - << "-" << last_byte << endl; - request += "\012Range: bytes="; - stringstream start_stream; - start_stream << first_byte << "-" << last_byte; - request += start_stream.str(); - } - request += "\012\012"; - int outlen = request.size(); - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - Sending request:\n" << request << endl; - int send_ret = safe_send(_socket, request.c_str(), outlen, - (long)downloader_timeout); - - // Handle timeouts on the send - if (send_ret == SS_timeout) { - downloader_cat.error() - << "Download::initiate() - send timed out" << endl; - return DS_timeout; - } - - if (send_ret == SS_error) - return DS_error_connect; - - // Create a download status to maintain download progress information - if (_current_status != NULL) - delete _current_status; - _current_status = new DownloadStatus(_buffer->_buffer, - first_byte, last_byte, total_bytes, - partial_content); - - return DS_success; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::run -// Access: Published -// Description: -//////////////////////////////////////////////////////////////////// -int Download:: -run(void) { - if (_current_status == NULL) { - downloader_cat.error() - << "Download::run() - Did not call initiate() first" << endl; - return DS_error; - } - - if (connect_to_server() == false) - return DS_error_connect; - - int ret = DS_ok; - double t0 = _clock.get_real_time(); - if (_tfirst == 0.0) { - _tfirst = t0; - } - if (t0 - _tlast < _frequency) - return DS_ok; - - // Recompute the buffer size if necessary - if (_recompute_buffer == true) { - - // Flush the current buffer if it holds any data - if (_current_status->_bytes_in_buffer > 0) { - if (write_to_disk(_current_status) == false) { - return DS_error_write; - } - ret = DS_write; - } - - // Allocate a new buffer - _buffer.clear(); - _receive_size = (int)ceil(_frequency * _byte_rate); - _disk_buffer_size = _receive_size * _disk_write_frequency; - _buffer = new Buffer(_disk_buffer_size); - _current_status->_buffer = _buffer->_buffer; - _current_status->reset(); - - } else if (_current_status->_bytes_in_buffer + _receive_size > - _disk_buffer_size) { - - // Flush the current buffer if the next request would overflow it - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::run() - Flushing buffer" << endl; - if (write_to_disk(_current_status) == false) - return DS_error_write; - ret = DS_write; - } - - // Attempt to receive the bytes from the socket - int bytes_read = fast_receive(_socket, _current_status, _receive_size); - _tlast = _clock.get_real_time(); - - // Check for end of file - if (bytes_read == 0) { - if (_got_any_data == true) { - if (_current_status->_bytes_in_buffer > 0) { - if (write_to_disk(_current_status) == false) - return DS_error_write; - ret = DS_write; - } - return DS_success; - } else { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::run() - Got 0 bytes" << endl; - return DS_ok; - } - } else if (bytes_read < 0) { - return DS_error; - } - - _got_any_data = true; - return ret; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::parse_http_response -// Access: Private -// Description: Check the HTTP response from the server -//////////////////////////////////////////////////////////////////// -bool Download:: -parse_http_response(const string &resp) { - size_t ws = resp.find(" ", 0); - string httpstr = resp.substr(0, ws); - if (!(httpstr == "HTTP/1.1")) { - downloader_cat.error() - << "Download::parse_http_response() - not HTTP/1.1 - got: " - << httpstr << endl; - return false; - } - size_t ws2 = resp.find(" ", ws); - string numstr = resp.substr(ws, ws2); - nassertr(numstr.length() > 0, false); - int num = atoi(numstr.c_str()); - switch (num) { - case 200: - case 206: - return true; - case 202: - // Accepted - server may not honor request, though - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::parse_http_response() - got a 202 Accepted - " - << "server does not guarantee to honor this request" << endl; - return true; - case 201: - case 203: - case 204: - case 205: - default: - break; - } - - downloader_cat.error() - << "Download::parse_http_response() - Invalid response: " - << resp << endl; - return false; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::parse_header -// Access: Private -// Description: Looks for a valid header. If it finds one, it -// calculates the header length and strips it from -// the download status structure. Function returns false -// on an error condition, otherwise true. -//////////////////////////////////////////////////////////////////// -bool Download:: -parse_header(DownloadStatus *status) { - nassertr(status != NULL, false); - - if (status->_header_is_complete == true) - return true; - - if (status->_bytes_in_buffer == 0) { - downloader_cat.error() - << "Download::parse_header() - Empty buffer!" << endl; - return false; - } - - string bufstr((char *)status->_start, status->_bytes_in_buffer); - size_t p = 0; - while (p < bufstr.length()) { - // Server sends out CR LF (\r\n) as newline delimiter - size_t nl = bufstr.find("\015\012", p); - if (nl == string::npos) { - downloader_cat.error() - << "Download::parse_header() - No newlines in buffer of " - << "length: " << status->_bytes_in_buffer << endl; - return false; - } else if (p == 0 && nl == p) { - downloader_cat.error() - << "Download::parse_header() - Buffer begins with newline!" - << endl; - return false; - } - - string component = bufstr.substr(p, nl - p); - - // The first line of the response should say whether - // got an error or not - if (status->_first_line_complete == false) { - status->_first_line_complete = true; - if (parse_http_response(component) == true) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::parse_header() - Header is valid: " - << component << endl; - status->_header_is_valid = true; - } else { - return false; - } - } - - // Look for content length - size_t cpos = component.find(":"); - string tline = component.substr(0, cpos); - if (status->_partial_content == true && tline == "Content-Length") { - tline = component.substr(cpos + 2, string::npos); - int server_download_bytes = atoi(tline.c_str()); - int client_download_bytes = status->_last_byte - status->_first_byte; - if (status->_first_byte == 0) - client_download_bytes += 1; - if (client_download_bytes != server_download_bytes) { - downloader_cat.error() - << "Download::parse_header() - server size = " - << server_download_bytes << ", client size = " - << client_download_bytes << " (" - << status->_last_byte << "-" << status->_first_byte << ")" << endl; - return false; - } - } - - // Two consecutive (CR LF)s indicates end of HTTP header - if (nl == p) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::parse_header() - Header is complete" << endl; - status->_header_is_complete = true; - - // Strip the header out of the status buffer - int header_length = nl + 2; - status->_start += header_length; - status->_bytes_in_buffer -= header_length; - - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::parse_header() - Stripping out header of size: " - << header_length << endl; - - return true; - } - - p = nl + 2; - } - - if (status->_header_is_complete == false) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::parse_header() - Reached end of buffer without " - << "successfully parsing the header - buffer size: " - << status->_bytes_in_buffer << endl; - } - - return true; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::write_to_disk -// Access: Private -// Description: Writes a download to disk. If there is a header, -// the pointer and size are adjusted so the header -// is excluded. Function returns false on error -// condition. -//////////////////////////////////////////////////////////////////// -bool Download:: -write_to_disk(DownloadStatus *status) { - nassertr(status != NULL, false); - - // Ensure the header has been parsed successfully first - if (parse_header(status) == false) - return false; - - if (status->_header_is_complete == false) { - downloader_cat.error() - << "Download::write_to_disk() - Incomplete HTTP header - " - << "(or header was larger than download buffer) - " - << "try increasing download-buffer-size" << endl; - return false; - } - - // Write what we have so far to disk - if (status->_bytes_in_buffer > 0) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download::write_to_disk() - Writing " - << status->_bytes_in_buffer << " to disk" << endl; - - _dest_stream.write(status->_start, status->_bytes_in_buffer); - status->_total_bytes_written += status->_bytes_in_buffer; - } - - status->reset(); - - return true; -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::DownloadStatus::constructor -// Access: Private -// Description: -//////////////////////////////////////////////////////////////////// -Download::DownloadStatus:: -DownloadStatus(char *buffer, int first_byte, int last_byte, - int total_bytes, bool partial_content) { - _first_line_complete = false; - _header_is_complete = false; - _header_is_valid = false; - _buffer = buffer; - _first_byte = first_byte; - _last_byte = last_byte; - _total_bytes = total_bytes; - _partial_content = partial_content; - reset(); -} - -//////////////////////////////////////////////////////////////////// -// Function: Download::DownloadStatus::reset -// Access: Public -// Description: Resets the status buffer for more downloading after -// a write. -//////////////////////////////////////////////////////////////////// -void Download::DownloadStatus:: -reset(void) { - _start = _buffer; - _next_in = _start; - _bytes_in_buffer = 0; - _total_bytes_written = 0; -} diff --git a/panda/src/downloader/download.h b/panda/src/downloader/download.h deleted file mode 100644 index 0a858e67be..0000000000 --- a/panda/src/downloader/download.h +++ /dev/null @@ -1,119 +0,0 @@ -// Filename: download.h -// Created by: mike (09Jan97) -// -//////////////////////////////////////////////////////////////////// -// -#ifndef DOWNLOAD_H -#define DOWNLOAD_H -// -//////////////////////////////////////////////////////////////////// -// Includes -//////////////////////////////////////////////////////////////////// -#include -#include -#include -#include -#include -#include - -#if defined(WIN32_VC) - #include -#else - #include - #include - #include -#endif - -//////////////////////////////////////////////////////////////////// -// Class : Download -// Description : -//////////////////////////////////////////////////////////////////// -class EXPCL_PANDAEXPRESS Download { -PUBLISHED: - enum DownloadCode { - DS_ok = 3, - DS_write = 2, - DS_success = 1, - DS_error = -1, - DS_error_write = -2, - DS_error_connect = -3, - }; - - Download(void); - virtual ~Download(void); - - bool connect_to_server(const string &name, uint port=80); - void disconnect_from_server(void); - - int initiate(const string &file_name, Filename file_dest); - int initiate(const string &file_name, Filename file_dest, - int first_byte, int last_byte, int total_bytes, - bool partial_content = true); - int run(void); - - INLINE void set_frequency(float frequency); - INLINE float get_frequency(void) const; - INLINE void set_byte_rate(float bytes); - INLINE float get_byte_rate(void) const; - INLINE void set_disk_write_frequency(int frequency); - INLINE int get_disk_write_frequency(void) const; - INLINE int get_bytes_written(void) const; - INLINE float get_bytes_per_second(void) const; - -private: - class DownloadStatus { - public: - DownloadStatus(char *buffer, int first_byte, int last_byte, - int total_bytes, bool partial_content); - void reset(void); - - public: - bool _first_line_complete; - bool _header_is_complete; - bool _header_is_valid; - char *_start; - char *_next_in; - int _bytes_in_buffer; - int _total_bytes_written; - int _first_byte; - int _last_byte; - int _total_bytes; - bool _partial_content; - char *_buffer; - }; - - INLINE void recompute_buffer(void); - - bool connect_to_server(void); - int safe_send(int socket, const char *data, int length, long timeout); - int fast_receive(int socket, DownloadStatus *status, int rec_size); - bool parse_http_response(const string &resp); - bool parse_header(DownloadStatus *status); - bool write_to_disk(DownloadStatus *status); - -private: - bool _connected; - int _socket; - string _server_name; - struct sockaddr_in _sin; - - PT(Buffer) _buffer; - int _disk_write_frequency; - float _frequency; - float _byte_rate; - int _receive_size; - int _disk_buffer_size; - ofstream _dest_stream; - bool _recompute_buffer; - - DownloadStatus *_current_status; - bool _got_any_data; - - double _tlast; - double _tfirst; - ClockObject _clock; -}; - -#include "download.I" - -#endif diff --git a/panda/src/downloader/downloader.I b/panda/src/downloader/downloader.I index 850c6dd292..0f4d4ee829 100644 --- a/panda/src/downloader/downloader.I +++ b/panda/src/downloader/downloader.I @@ -5,6 +5,30 @@ #include "config_downloader.h" +//////////////////////////////////////////////////////////////////// +// Function: Downloader::set_frequency +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +INLINE void Downloader:: +set_frequency(float frequency) { + nassertv(frequency > 0.0); + if (_frequency != frequency) { + _frequency = frequency; + _recompute_buffer = true; + } +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::get_frequency +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +INLINE float Downloader:: +get_frequency(void) const { + return _frequency; +} + //////////////////////////////////////////////////////////////////// // Function: Downloader::set_byte_rate // Access: Public @@ -14,17 +38,10 @@ INLINE void Downloader:: set_byte_rate(float bytes) { nassertv(bytes > 0.0); - if (bytes == _byte_rate) - return; -#ifdef HAVE_IPC - _buffer_lock.lock(); -#endif - - _new_byte_rate = bytes; - -#ifdef HAVE_IPC - _buffer_lock.unlock(); -#endif + if (_byte_rate != bytes) { + _byte_rate = bytes; + _recompute_buffer = true; + } } //////////////////////////////////////////////////////////////////// @@ -37,26 +54,6 @@ get_byte_rate(void) const { return _byte_rate; } -//////////////////////////////////////////////////////////////////// -// Function: Downloader::enable_download -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -INLINE void Downloader:: -enable_download(bool val) { - _download_enabled = val; -} - -//////////////////////////////////////////////////////////////////// -// Function: Downloader::is_download_enabled -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -INLINE bool Downloader:: -is_download_enabled(void) const { - return _download_enabled; -} - //////////////////////////////////////////////////////////////////// // Function: Downloader::set_disk_write_frequency // Access: Public @@ -65,15 +62,10 @@ is_download_enabled(void) const { INLINE void Downloader:: set_disk_write_frequency(int frequency) { nassertv(frequency > 0); -#ifdef HAVE_IPC - _buffer_lock.lock(); -#endif - - _new_disk_write_frequency = frequency; - -#ifdef HAVE_IPC - _buffer_lock.unlock(); -#endif + if (_disk_write_frequency != frequency) { + _disk_write_frequency = frequency; + _recompute_buffer = true; + } } //////////////////////////////////////////////////////////////////// @@ -87,11 +79,24 @@ get_disk_write_frequency(void) const { } //////////////////////////////////////////////////////////////////// -// Function: Downloader::get_last_attempt_stalled +// Function: Downloader::get_bytes_written // Access: Public // Description: //////////////////////////////////////////////////////////////////// -INLINE bool Downloader:: -get_last_attempt_stalled(void) const { - return _last_attempt_stalled; +INLINE int Downloader:: +get_bytes_written(void) const { + nassertr(_current_status != NULL, 0); + return _current_status->_total_bytes_written; +} + +//////////////////////////////////////////////////////////////////// +// Function: Downloader::get_bytes_per_second +// Access: Public +// Description: +//////////////////////////////////////////////////////////////////// +INLINE float Downloader:: +get_bytes_per_second(void) const { + nassertr(_tlast - _tfirst > 0.0, 0.0); + nassertr(_current_status != NULL, 0.0); + return (float)((double)_current_status->_total_bytes / (_tlast - _tfirst)); } diff --git a/panda/src/downloader/downloader.cxx b/panda/src/downloader/downloader.cxx index 9c0f75fc04..a64e555c15 100644 --- a/panda/src/downloader/downloader.cxx +++ b/panda/src/downloader/downloader.cxx @@ -9,18 +9,11 @@ #include "downloader.h" #include "config_downloader.h" -#include -#include -#include -#include -#include #include -#include #include #include #if !defined(WIN32_VC) -// #define errno wsaGetLastError() #include #include #include @@ -30,93 +23,44 @@ //////////////////////////////////////////////////////////////////// // Defines //////////////////////////////////////////////////////////////////// -enum send_status { - SS_error, - SS_timeout, - SS_success +enum SafeSendCode { + SS_success = 1, + SS_error = -1, + SS_timeout = -2, }; -enum receive_status { - RS_error, - RS_timeout, - RS_success, - RS_eof -}; - -//////////////////////////////////////////////////////////////////// -// Class : DownloaderToken -// Description : Holds a request for the downloader. -//////////////////////////////////////////////////////////////////// -class DownloaderToken : public ReferenceCount { -public: - INLINE DownloaderToken(uint id, const string &file_name, - const Filename &file_dest, const string &event_name, - int first_byte, int last_byte, int total_bytes, - bool partial_content, bool sync) : _id(id), _first_byte(first_byte), - _last_byte(last_byte), _total_bytes(total_bytes) { - _file_name = file_name; - _event_name = event_name; - _file_dest = file_dest; - _partial_content = partial_content; - _sync = sync; - } - uint _id; - string _file_name; - Filename _file_dest; - string _event_name; - int _first_byte; - int _last_byte; - int _total_bytes; - bool _partial_content; - bool _sync; +enum FastReceiveCode { + FR_eof = 2, + FR_success = 1, + FR_error = -1, + FR_timeout = -2, + FR_no_data = -3, }; //////////////////////////////////////////////////////////////////// // Function: Downloader::Constructor -// Access: Public +// Access: Published // Description: //////////////////////////////////////////////////////////////////// Downloader:: -Downloader(void) : AsyncUtility() { - init(); -} - -#if 0 -//////////////////////////////////////////////////////////////////// -// Function: Downloader::Constructor -// Access: Public -// Description: -//////////////////////////////////////////////////////////////////// -Downloader:: -Downloader(PT(Buffer) buffer) : AsyncUtility() { - init(buffer); -} -#endif - -//////////////////////////////////////////////////////////////////// -// Function: Downloader::init -// Access: Private -// Description: -//////////////////////////////////////////////////////////////////// -void Downloader:: -init(void) { - _disk_write_frequency = downloader_disk_write_frequency; - _new_disk_write_frequency = 0; - _byte_rate = downloader_byte_rate; - _new_byte_rate = 0; +Downloader(void) { _frequency = downloader_frequency; - nassertv(_frequency > 0 && _byte_rate > 0); - _read_size = _byte_rate * _frequency; - _disk_buffer_size = _disk_write_frequency * _read_size; + _byte_rate = downloader_byte_rate; + _disk_write_frequency = downloader_disk_write_frequency; + nassertv(_frequency > 0 && _byte_rate > 0 && _disk_write_frequency > 0); + _receive_size = _byte_rate * _frequency; + _disk_buffer_size = _disk_write_frequency * _receive_size; _buffer = new Buffer(_disk_buffer_size); + _connected = false; - _token_board = new DownloaderTokenBoard; - _download_enabled = true; - _last_attempt_stalled = true; // We need to flush after every write in case we're interrupted _dest_stream.setf(ios::unitbuf, 0); - _last_attempt_stalled = false; - _current_attempt_stalled = false; + _current_status = NULL; + _recompute_buffer = false; + + _tfirst = 0.0; + _tlast = 0.0; + _got_any_data = false; #if defined(WIN32) WSAData mydata; @@ -138,23 +82,22 @@ Downloader:: ~Downloader() { if (_connected) disconnect_from_server(); - - destroy_thread(); - - delete _token_board; + delete _buffer; + if (_current_status != NULL) + delete _current_status; } //////////////////////////////////////////////////////////////////// // Function: Downloader::connect_to_server -// Access: Public +// Access: Public // Description: //////////////////////////////////////////////////////////////////// bool Downloader:: connect_to_server(const string &name, uint port) { if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader connecting to server: " << name - << " on port: " << port << endl; + << "Download connecting to server: " << name << " on port: " + << port << endl; _server_name = name; @@ -169,8 +112,8 @@ connect_to_server(const string &name, uint port) { (void)memcpy(&_sin.sin_addr, hp->h_addr, (uint)hp->h_length); else { downloader_cat.error() - << "Downloader::connect_to_server() - gethostbyname() failed: " - << strerror(errno) << endl; + << "Downloader::connect_to_server() - gethostbyname() failed: " + << strerror(errno) << endl; return false; } } else @@ -213,192 +156,22 @@ connect_to_server(void) { /////////////////////////////////////////////////////////////////// // Function: Downloader::disconnect_from_server -// Access: Public +// Access: Public // Description: //////////////////////////////////////////////////////////////////// void Downloader:: disconnect_from_server(void) { if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader disconnecting from server..." << endl; - + << "Download disconnecting from server..." << endl; #if defined(WIN32) (void)closesocket(_socket); #else (void)close(_socket); #endif - _connected = false; } -//////////////////////////////////////////////////////////////////// -// Function: Downloader::request_sync_download -// Access: Public -// Description: Requests the synchronous download of a complete file. -//////////////////////////////////////////////////////////////////// -int Downloader:: -request_sync_download(const string &file_name, const Filename &file_dest, - const string &event_name) { - return request_download(file_name, file_dest, event_name, true); -} - -//////////////////////////////////////////////////////////////////// -// Function: Downloader::request_sync_download -// Access: Public -// Description: Requests the synchronous download of a complete file. -//////////////////////////////////////////////////////////////////// -int Downloader:: -request_sync_download(const string &file_name, const Filename &file_dest, - const string &event_name, int first_byte, - int last_byte, int total_bytes, bool partial_content) { - return request_download(file_name, file_dest, event_name, first_byte, - last_byte, total_bytes, partial_content, true); -} - -//////////////////////////////////////////////////////////////////// -// Function: Downloader::request_download -// Access: Public -// Description: Requests the download of a complete file. -//////////////////////////////////////////////////////////////////// -int Downloader:: -request_download(const string &file_name, const Filename &file_dest, - const string &event_name, bool sync) { - return request_download(file_name, file_dest, event_name, 0, 0, 0, - false, sync); -} - -//////////////////////////////////////////////////////////////////// -// Function: Downloader::request_download -// Access: Public -// Description: Requests an asynchronous load of a file. The request -// will be queued and served by the asynchronous thread. -// If event_name is nonempty, it is the name of the -// event that will be thrown (with the uint id as its -// single parameter) when the loading is completed later. -// -// The return value is an integer which can be used to -// identify this particular request later to -// fetch_load(), or 0 if there has been an error. -// -// Can be used to request a partial download of a file. -//////////////////////////////////////////////////////////////////// -int Downloader:: -request_download(const string &file_name, const Filename &file_dest, - const string &event_name, int first_byte, - int last_byte, int total_bytes, - bool partial_content, bool sync) { - - nassertr(first_byte <= last_byte && last_byte <= total_bytes, 0); - - PT(DownloaderToken) tok; - if (_threads_enabled) { - - // Make sure we actually are threaded - if (!_threaded) { - downloader_cat.info() - << "Downloader::request_download() - create_thread() was " - << "never called! Calling it now..." << endl; - create_thread(); - } - - // We need to grab the lock in order to signal the condition variable -#ifdef HAVE_IPC - _lock.lock(); -#endif - - if (_token_board->_waiting.is_full()) { - downloader_cat.error() - << "Downloader::request_download() - Too many pending requests\n"; - return 0; - } - - if (downloader_cat.is_debug()) { - downloader_cat.debug() - << "Download requested for file: " << file_name << "\n"; - } - - tok = new DownloaderToken(_next_token++, file_name, file_dest, - event_name, first_byte, last_byte, total_bytes, - partial_content, sync); - _token_board->_waiting.insert(tok); - -#ifdef HAVE_IPC - _request_cond->signal(); - _lock.unlock(); -#endif - - } else { - // If we're not running asynchronously, process the load request - // directly now. - if (_token_board->_waiting.is_full()) { - downloader_cat.error() - << "Downloader::request_download() - Too many pending requests\n"; - return 0; - } - if (downloader_cat.is_debug()) { - downloader_cat.debug() - << "Load requested for file: " << file_name << "\n"; - } - - tok = new DownloaderToken(_next_token++, file_name, file_dest, - event_name, first_byte, last_byte, total_bytes, - partial_content, sync); - _token_board->_waiting.insert(tok); - process_request(); - } - - return tok->_id; -} - -//////////////////////////////////////////////////////////////////// -// Function: Downloader::process_request -// Access: Private -// Description: Serves any requests on the token board, moving them -// to the done queue. -//////////////////////////////////////////////////////////////////// -bool Downloader:: -process_request() { - if (_shutdown) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader shutting down...\n"; - return false; - } - - // If there is actually a request token - process it - while (!_token_board->_waiting.is_empty()) { - PT(DownloaderToken) tok = _token_board->_waiting.extract(); - int ret = download(tok->_file_name, tok->_file_dest, tok->_event_name, - tok->_first_byte, tok->_last_byte, tok->_total_bytes, - tok->_partial_content, tok->_sync, tok->_id); - nassertr(tok->_event_name.empty() == false, false); - PT_Event return_event = new Event(tok->_event_name); - return_event->add_parameter(EventParameter((int)tok->_id)); - if (ret == DS_success) { - _token_board->_done.insert(tok); - return_event->add_parameter(EventParameter(DS_success)); - - // Throw a "done" event now. - if (!tok->_event_name.empty()) { - PT_Event done = new Event(tok->_event_name); - done->add_parameter(EventParameter((int)tok->_id)); - throw_event(done); - } - - if (downloader_cat.is_debug()) { - downloader_cat.debug() - << "Downloader::process_request() - downloading complete for " - << tok->_file_name << "\n"; - } - } else { - return_event->add_parameter(EventParameter(ret)); - } - throw_event(return_event); - } - - return true; -} - //////////////////////////////////////////////////////////////////// // Function: Downloader::safe_send // Access: Private @@ -422,12 +195,12 @@ safe_send(int socket, const char *data, int length, long timeout) { int sret = select(socket + 1, NULL, &wset, NULL, &tv); if (sret == 0) { downloader_cat.error() - << "Downloader::safe_send() - select timed out after: " - << timeout << " seconds" << endl; + << "Downloader::safe_send() - select timed out after: " + << timeout << " seconds" << endl; return SS_timeout; } else if (sret == -1) { downloader_cat.error() - << "Downloader::safe_send() - error: " << strerror(errno) << endl; + << "Downloader::safe_send() - error: " << strerror(errno) << endl; return SS_error; } int ret = send(socket, data, length, 0); @@ -435,7 +208,7 @@ safe_send(int socket, const char *data, int length, long timeout) { bytes += ret; else { downloader_cat.error() - << "Downloader::safe_send() - error: " << strerror(errno) << endl; + << "Downloader::safe_send() - error: " << strerror(errno) << endl; return SS_error; } } @@ -443,138 +216,76 @@ safe_send(int socket, const char *data, int length, long timeout) { } //////////////////////////////////////////////////////////////////// -// Function: Downloader::safe_receive +// Function: Downloader::fast_receive // Access: Private // Description: //////////////////////////////////////////////////////////////////// int Downloader:: -safe_receive(int socket, DownloadStatus &status, int length, - long timeout, int &bytes) { - bytes = 0; - if (length == 0) { +fast_receive(int socket, DownloadStatus *status, int rec_size) { + nassertr(status != NULL, FR_error); + if (rec_size <= 0) { downloader_cat.error() - << "Downloader::safe_receive() - requested 0 length receive!" << endl; - return RS_error; + << "Downloader::fast_receive() - Invalid receive size: " << rec_size + << endl; + return FR_error; } + + // Poll the socket with select() to see if there is any data struct timeval tv; - tv.tv_sec = timeout; + tv.tv_sec = 0; tv.tv_usec = 0; fd_set rset; FD_ZERO(&rset); - while (bytes < length) { - FD_SET(socket, &rset); - int sret = select(socket + 1, &rset, NULL, NULL, &tv); - if (sret == 0) { - downloader_cat.warning() - << "Downloader::safe_receive() - select timed out after: " - << timeout << " seconds" << endl; - return RS_timeout; - } else if (sret == -1) { - downloader_cat.error() - << "Downloader::safe_receive() - error: " << strerror(errno) << endl; - return RS_error; - } - int ret = recv(socket, status._next_in, length - bytes, 0); - if (ret > 0) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::safe_receive() - recv() got: " << ret << " bytes" - << endl; - bytes += ret; - status._next_in += ret; - status._bytes_in_buffer += ret; - if (bytes < length) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::safe_receive() - Download stalled" << endl; - _current_attempt_stalled = true; - } - } else if (ret == 0) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::safe_receive() - End of file" << endl; - return RS_eof; - } else { - downloader_cat.error() - << "Downloader::safe_receive() - error: " << strerror(errno) << endl; - return RS_error; - } + FD_SET(socket, &rset); + int sret = select(socket + 1, &rset, NULL, NULL, &tv); + if (sret == 0) { + return FR_no_data; + } else if (sret == -1) { + downloader_cat.error() + << "Downloader::safe_receive() - error: " << strerror(errno) << endl; + return FR_error; } - nassertr(bytes == length, RS_error); - return RS_success; + int ret = recv(socket, status->_next_in, rec_size, 0); + if (ret == 0) { + return FR_eof; + } else if (ret == -1) { + downloader_cat.error() + << "Downloader::fast_receive() - error: " << strerror(errno) << endl; + return FR_error; + } + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::fast_receive() - recv() got: " << ret << " bytes" + << endl; + status->_next_in += ret; + status->_bytes_in_buffer += ret; + status->_total_bytes += ret; + return ret; } //////////////////////////////////////////////////////////////////// -// Function: Downloader::attempt_read -// Access: Private -// Description: +// Function: Downloader::initiate +// Access: Published +// Description: Initiate the download of a complete file from the server. //////////////////////////////////////////////////////////////////// int Downloader:: -attempt_read(int length, DownloadStatus &status, int &bytes_read) { - - bytes_read = 0; - for (int i = 0; i < downloader_timeout_retries; i++) { - - // Ensure we have enough room in the buffer to download length bytes - // If we don't have enough room, write the buffer to disk - if (status._bytes_in_buffer + length > _disk_buffer_size) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::attempt_read() - Flushing buffer" << endl; - - if (write_to_disk(status) == false) - return RS_error; - } - - // Make the request for length bytes - int bytes; - int ans = safe_receive(_socket, status, length, - (long)downloader_timeout, bytes); - bytes_read += bytes; - - switch (ans) { - case RS_error: - case RS_eof: - return ans; - case RS_timeout: - // Try again - break; - case RS_success: - nassertr(bytes == length, RS_error); - return RS_success; - default: - downloader_cat.error() - << "Downloader::attempt_read() - unknown return condition " - << "from safe_receive() : " << ans << endl; - return RS_error; - } - } - - // We timed out on retries consecutive attempts - this is considered - // a true timeout - return RS_timeout; +initiate(const string &file_name, Filename file_dest) { + return initiate(file_name, file_dest, 0, 0, 0, false); } //////////////////////////////////////////////////////////////////// -// Function: Downloader::download -// Access: Private -// Description: +// Function: Downloader::initiate +// Access: Published +// Description: Initiate the download of a file from a server. //////////////////////////////////////////////////////////////////// int Downloader:: -download(const string &file_name, Filename file_dest, - const string &event_name, int first_byte, int last_byte, - int total_bytes, bool partial_content, bool sync, uint id) { +initiate(const string &file_name, Filename file_dest, + int first_byte, int last_byte, int total_bytes, + bool partial_content) { - if (_download_enabled == false) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - downloading is disabled" << endl; - return DS_abort; - } - - // Make sure we are still connected to the server + // Connect to the server if (connect_to_server() == false) - return DS_abort; + return DS_error_connect; // Attempt to open the destination file file_dest.set_binary(); @@ -587,7 +298,7 @@ download(const string &file_name, Filename file_dest, downloader_cat.error() << "Downloader::download() - Error opening file: " << file_dest << " for writing" << endl; - return DS_abort; + return DS_error_write; } // Send an HTTP request for the file to the server @@ -599,7 +310,7 @@ download(const string &file_name, Filename file_dest, if (partial_content == true) { if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader::download() - Requesting byte range: " << first_byte + << "Downloader::download() - Requesting byte range: " << first_byte << "-" << last_byte << endl; request += "\012Range: bytes="; stringstream start_stream; @@ -611,191 +322,109 @@ download(const string &file_name, Filename file_dest, if (downloader_cat.is_debug()) downloader_cat.debug() << "Downloader::download() - Sending request:\n" << request << endl; - int send_ret = safe_send(_socket, request.c_str(), outlen, - (long)downloader_timeout); + int send_ret = safe_send(_socket, request.c_str(), outlen, + (long)downloader_timeout); // Handle timeouts on the send if (send_ret == SS_timeout) { - for (int sr = 0; sr < downloader_timeout_retries; sr++) { - send_ret = safe_send(_socket, request.c_str(), outlen, - (long)downloader_timeout); - if (send_ret != SS_timeout) - break; - } - if (send_ret == SS_timeout) { - // We've really timed out - throw an event - downloader_cat.error() - << "Downloader::download() - send timed out after: " - << downloader_timeout_retries << " retries" << endl; - return DS_timeout; - } + downloader_cat.error() + << "Downloader::initiate() - send timed out" << endl; + return DS_error_connect; } if (send_ret == SS_error) - return DS_abort; + return DS_error_connect; // Create a download status to maintain download progress information - DownloadStatus status(_buffer->_buffer, event_name, first_byte, last_byte, - total_bytes, partial_content, id); - bool got_any_data = false; + if (_current_status != NULL) + delete _current_status; + _current_status = new DownloadStatus(_buffer->_buffer, + first_byte, last_byte, total_bytes, + partial_content); - // Loop at the requested frequency until the download completes - for (;;) { - bool resize_buffer = false; + return DS_success; +} - // Ensure that these don't change while we're computing read_size -#ifdef HAVE_IPC - _buffer_lock.lock(); -#endif +//////////////////////////////////////////////////////////////////// +// Function: Downloader::run +// Access: Published +// Description: +//////////////////////////////////////////////////////////////////// +int Downloader:: +run(void) { + if (_current_status == NULL) { + downloader_cat.error() + << "Downloader::run() - Did not call initiate() first" << endl; + return DS_error; + } - nassertr(_frequency > 0, DS_abort); - // If byte rate has changed, recompute read size and write buffer size - if (_new_byte_rate > 0) { - _read_size = (int)ceil(_new_byte_rate * _frequency); - _byte_rate = _new_byte_rate; - _new_byte_rate = 0; - resize_buffer = true; - } + if (connect_to_server() == false) + return DS_error_connect; - // If the disk write frequency has changed, compute a new buffer size - if (_new_disk_write_frequency > 0) { - _disk_write_frequency = _new_disk_write_frequency; - _new_disk_write_frequency = 0; - resize_buffer = true; - } + int ret = DS_ok; + double t0 = _clock.get_real_time(); + if (_tfirst == 0.0) { + _tfirst = t0; + } + if (t0 - _tlast < _frequency) + return DS_ok; - if (resize_buffer == true) { - // Flush the write buffer before resizing it - if (status._bytes_in_buffer > 0) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - Flushing buffer" << endl; - - if (write_to_disk(status) == false) { - downloader_cat.error() - << "Downloader::download() - failed to flush buffer during " - << "resize" << endl; - return DS_abort; - } + // Recompute the buffer size if necessary + if (_recompute_buffer == true) { + + // Flush the current buffer if it holds any data + if (_current_status->_bytes_in_buffer > 0) { + if (write_to_disk(_current_status) == false) { + return DS_error_write; } - - // Resize the buffer - _disk_buffer_size = (_disk_write_frequency * _read_size); - - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - resizing disk buffer to: " - << _disk_buffer_size << endl; - _buffer.clear(); - downloader_cat.debug() - << "Downloader::download() - buffer cleared" << endl; - _buffer = new Buffer(_disk_buffer_size); - // Update the status with the new buffer - status._buffer = _buffer->_buffer; - status.reset(); - downloader_cat.debug() - << "Downloader::download() - new buffer created" << endl; + ret = DS_write; } -#ifdef HAVE_IPC - _buffer_lock.unlock(); -#endif + // Allocate a new buffer + _buffer.clear(); + _receive_size = (int)ceil(_frequency * _byte_rate); + _disk_buffer_size = _receive_size * _disk_write_frequency; + _buffer = new Buffer(_disk_buffer_size); + _current_status->_buffer = _buffer->_buffer; + _current_status->reset(); - // Attempt to read - int bytes_read; + } else if (_current_status->_bytes_in_buffer + _receive_size > + _disk_buffer_size) { - int ret = attempt_read(_read_size, status, bytes_read); + // Flush the current buffer if the next request would overflow it if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader::download() - stalled status: " << _current_attempt_stalled - << endl; + << "Downloader::run() - Flushing buffer" << endl; + if (write_to_disk(_current_status) == false) + return DS_error_write; + ret = DS_write; + } - _last_attempt_stalled = _current_attempt_stalled; - _current_attempt_stalled = false; + // Attempt to receive the bytes from the socket + int bytes_read = fast_receive(_socket, _current_status, _receive_size); + _tlast = _clock.get_real_time(); - if (bytes_read > 0) - got_any_data = true; + // Check for end of file + if (bytes_read == 0) { + if (_got_any_data == true) { + if (_current_status->_bytes_in_buffer > 0) { + if (write_to_disk(_current_status) == false) + return DS_error_write; + ret = DS_write; + } + return DS_success; + } else { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::run() - Got 0 bytes" << endl; + return DS_ok; + } + } else if (bytes_read < 0) { + return DS_error; + } - switch (ret) { - case RS_error: - - downloader_cat.error() - << "Downloader::download() - Error reading from socket: " - << strerror(errno) << endl; - return DS_abort; - - case RS_timeout: - - { - // We've really timed out - throw an event - downloader_cat.error() - << "Downloader::download() - receive timed out after: " - << downloader_timeout_retries << " retries" << endl; - if (bytes_read > 0) { - if (write_to_disk(status) == false) { - downloader_cat.error() - << "Downloader::download() - write to disk failed after " - << "timeout!" << endl; - return DS_abort; - } - } - return DS_timeout; - } - - case RS_success: - - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - Got: " << bytes_read << " bytes" - << endl; - break; - - case RS_eof: - - { - // We occasionally will get 0 bytes on the first attempt - we - // don't want to treat this as end of file in any case - if (got_any_data == true) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Download for: " << file_name << " completed" << endl; - bool ret = true; - if (bytes_read > 0) - ret = write_to_disk(status); - _dest_stream.close(); - - // The "Connection: close" line tells server to close connection - // when the download is complete - _connected = false; - if (ret == false) - return DS_abort; - return DS_success; - } else { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::download() - Received 0 bytes" << endl; - } - } - break; - - default: - - downloader_cat.error() - << "Downloader::download() - Unknown return value from " - << "attempt_read() : " << ret << endl; - return DS_abort; - - } // switch(ret) - - // Sleep for the requested frequency - nap(); - - } // for (;;) - - downloader_cat.error() - << "Downloader::download() - Dropped out of for loop without returning!" - << endl; - return DS_abort; + _got_any_data = true; + return ret; } //////////////////////////////////////////////////////////////////// @@ -809,7 +438,7 @@ parse_http_response(const string &resp) { string httpstr = resp.substr(0, ws); if (!(httpstr == "HTTP/1.1")) { downloader_cat.error() - << "Downloader::parse_http_response() - not HTTP/1.1 - got: " + << "Downloader::parse_http_response() - not HTTP/1.1 - got: " << httpstr << endl; return false; } @@ -824,9 +453,9 @@ parse_http_response(const string &resp) { case 202: // Accepted - server may not honor request, though if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::parse_http_response() - got a 202 Accepted - " - << "server does not guarantee to honor this request" << endl; + downloader_cat.debug() + << "Downloader::parse_http_response() - got a 202 Accepted - " + << "server does not guarantee to honor this request" << endl; return true; case 201: case 203: @@ -839,98 +468,99 @@ parse_http_response(const string &resp) { downloader_cat.error() << "Downloader::parse_http_response() - Invalid response: " << resp << endl; - return false; + return false; } //////////////////////////////////////////////////////////////////// // Function: Downloader::parse_header // Access: Private -// Description: Looks for a valid header. If it finds one, it -// calculates the header length and strips it from -// the download status structure. Function returns false -// on an error condition, otherwise true. +// Description: Looks for a valid header. If it finds one, it +// calculates the header length and strips it from +// the download status structure. Function returns false +// on an error condition, otherwise true. //////////////////////////////////////////////////////////////////// bool Downloader:: -parse_header(DownloadStatus &status) { +parse_header(DownloadStatus *status) { + nassertr(status != NULL, false); - if (status._header_is_complete == true) + if (status->_header_is_complete == true) return true; - if (status._bytes_in_buffer == 0) { + if (status->_bytes_in_buffer == 0) { downloader_cat.error() << "Downloader::parse_header() - Empty buffer!" << endl; return false; } - string bufstr((char *)status._start, status._bytes_in_buffer); + string bufstr((char *)status->_start, status->_bytes_in_buffer); size_t p = 0; while (p < bufstr.length()) { // Server sends out CR LF (\r\n) as newline delimiter size_t nl = bufstr.find("\015\012", p); if (nl == string::npos) { downloader_cat.error() - << "Downloader::parse_header() - No newlines in buffer of " - << "length: " << status._bytes_in_buffer << endl; + << "Downloader::parse_header() - No newlines in buffer of " + << "length: " << status->_bytes_in_buffer << endl; return false; } else if (p == 0 && nl == p) { downloader_cat.error() << "Downloader::parse_header() - Buffer begins with newline!" - << endl; - return false; + << endl; + return false; } string component = bufstr.substr(p, nl - p); // The first line of the response should say whether // got an error or not - if (status._first_line_complete == false) { - status._first_line_complete = true; + if (status->_first_line_complete == false) { + status->_first_line_complete = true; if (parse_http_response(component) == true) { - if (downloader_cat.is_debug()) + if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader::parse_header() - Header is valid: " - << component << endl; - status._header_is_valid = true; + << "Downloader::parse_header() - Header is valid: " + << component << endl; + status->_header_is_valid = true; } else { - return false; + return false; } } // Look for content length size_t cpos = component.find(":"); string tline = component.substr(0, cpos); - if (status._partial_content == true && tline == "Content-Length") { + if (status->_partial_content == true && tline == "Content-Length") { tline = component.substr(cpos + 2, string::npos); int server_download_bytes = atoi(tline.c_str()); - int client_download_bytes = status._last_byte - status._first_byte; - if (status._first_byte == 0) - client_download_bytes += 1; + int client_download_bytes = status->_last_byte - status->_first_byte; + if (status->_first_byte == 0) + client_download_bytes += 1; if (client_download_bytes != server_download_bytes) { - downloader_cat.error() - << "Downloader::parse_header() - server size = " - << server_download_bytes << ", client size = " - << client_download_bytes << " (" - << status._last_byte << "-" << status._first_byte << ")" << endl; - return false; + downloader_cat.error() + << "Downloader::parse_header() - server size = " + << server_download_bytes << ", client size = " + << client_download_bytes << " (" + << status->_last_byte << "-" << status->_first_byte << ")" << endl; + return false; } - } + } // Two consecutive (CR LF)s indicates end of HTTP header if (nl == p) { if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader::parse_header() - Header is complete" << endl; - status._header_is_complete = true; - + << "Downloader::parse_header() - Header is complete" << endl; + status->_header_is_complete = true; + // Strip the header out of the status buffer int header_length = nl + 2; - status._start += header_length; - status._bytes_in_buffer -= header_length; + status->_start += header_length; + status->_bytes_in_buffer -= header_length; if (downloader_cat.is_debug()) downloader_cat.debug() - << "Downloader::parse_header() - Stripping out header of size: " - << header_length << endl; + << "Downloader::parse_header() - Stripping out header of size: " + << header_length << endl; return true; } @@ -938,12 +568,12 @@ parse_header(DownloadStatus &status) { p = nl + 2; } - if (status._header_is_complete == false) { + if (status->_header_is_complete == false) { if (downloader_cat.is_debug()) downloader_cat.debug() << "Downloader::parse_header() - Reached end of buffer without " - << "successfully parsing the header - buffer size: " - << status._bytes_in_buffer << endl; + << "successfully parsing the header - buffer size: " + << status->_bytes_in_buffer << endl; } return true; @@ -953,46 +583,38 @@ parse_header(DownloadStatus &status) { // Function: Downloader::write_to_disk // Access: Private // Description: Writes a download to disk. If there is a header, -// the pointer and size are adjusted so the header -// is excluded. Function returns false on error -// condition. +// the pointer and size are adjusted so the header +// is excluded. Function returns false on error +// condition. //////////////////////////////////////////////////////////////////// bool Downloader:: -write_to_disk(DownloadStatus &status) { +write_to_disk(DownloadStatus *status) { + nassertr(status != NULL, false); // Ensure the header has been parsed successfully first if (parse_header(status) == false) return false; - if (status._header_is_complete == false) { + if (status->_header_is_complete == false) { downloader_cat.error() << "Downloader::write_to_disk() - Incomplete HTTP header - " << "(or header was larger than download buffer) - " << "try increasing download-buffer-size" << endl; return false; - } - - // Write what we have so far to disk - if (status._bytes_in_buffer > 0) { - if (downloader_cat.is_debug()) - downloader_cat.debug() - << "Downloader::write_to_disk() - Writing " - << status._bytes_in_buffer << " to disk" << endl; - - _dest_stream.write(status._start, status._bytes_in_buffer); - status._total_bytes_written += status._bytes_in_buffer; - - // Throw an event to indicate how many bytes have been written so far - if (!status._event_name.empty()) { - PT_Event write_event = new Event(status._event_name); - write_event->add_parameter(EventParameter((int)status._id)); - write_event->add_parameter(EventParameter(DS_write)); - write_event->add_parameter(EventParameter(status._total_bytes_written)); - throw_event(write_event); - } } - status.reset(); + // Write what we have so far to disk + if (status->_bytes_in_buffer > 0) { + if (downloader_cat.is_debug()) + downloader_cat.debug() + << "Downloader::write_to_disk() - Writing " + << status->_bytes_in_buffer << " to disk" << endl; + + _dest_stream.write(status->_start, status->_bytes_in_buffer); + status->_total_bytes_written += status->_bytes_in_buffer; + } + + status->reset(); return true; } @@ -1003,26 +625,24 @@ write_to_disk(DownloadStatus &status) { // Description: //////////////////////////////////////////////////////////////////// Downloader::DownloadStatus:: -DownloadStatus(char *buffer, const string &event_name, int first_byte, - int last_byte, int total_bytes, bool partial_content, uint id) { +DownloadStatus(char *buffer, int first_byte, int last_byte, + int total_bytes, bool partial_content) { _first_line_complete = false; _header_is_complete = false; _header_is_valid = false; _buffer = buffer; - _event_name = event_name; _first_byte = first_byte; _last_byte = last_byte; _total_bytes = total_bytes; _partial_content = partial_content; - _id = id; reset(); } //////////////////////////////////////////////////////////////////// // Function: Downloader::DownloadStatus::reset -// Access: Public +// Access: Public // Description: Resets the status buffer for more downloading after -// a write. +// a write. //////////////////////////////////////////////////////////////////// void Downloader::DownloadStatus:: reset(void) { diff --git a/panda/src/downloader/downloader.h b/panda/src/downloader/downloader.h index fa1a44f308..8308b3fbc0 100644 --- a/panda/src/downloader/downloader.h +++ b/panda/src/downloader/downloader.h @@ -12,9 +12,9 @@ #include #include #include -#include #include -#include "asyncUtility.h" +#include +#include #if defined(WIN32_VC) #include @@ -24,54 +24,47 @@ #include #endif -class DownloaderToken; - //////////////////////////////////////////////////////////////////// // Class : Downloader // Description : //////////////////////////////////////////////////////////////////// -class EXPCL_PANDAEXPRESS Downloader : public AsyncUtility { +class EXPCL_PANDAEXPRESS Downloader { PUBLISHED: enum DownloadCode { + DS_ok = 3, DS_write = 2, DS_success = 1, - DS_abort = -1, - DS_timeout = -2 + DS_error = -1, + DS_error_write = -2, + DS_error_connect = -3, }; + Downloader(void); - //Downloader(PT(Buffer) buffer); virtual ~Downloader(void); bool connect_to_server(const string &name, uint port=80); void disconnect_from_server(void); - int request_sync_download(const string &file_name, const Filename &file_dest, - const string &event_name); - int request_sync_download(const string &file_name, const Filename &file_dest, - const string &event_name, int first_byte, - int last_byte, int total_bytes, - bool partial_content = true); - int request_download(const string &file_name, const Filename &file_dest, - const string &event_name, bool sync = false); - int request_download(const string &file_name, const Filename &file_dest, - const string &event_name, int first_byte, - int last_byte, int total_bytes, - bool partial_content = true, bool sync = false); + int initiate(const string &file_name, Filename file_dest); + int initiate(const string &file_name, Filename file_dest, + int first_byte, int last_byte, int total_bytes, + bool partial_content = true); + int run(void); + INLINE void set_frequency(float frequency); + INLINE float get_frequency(void) const; INLINE void set_byte_rate(float bytes); INLINE float get_byte_rate(void) const; INLINE void set_disk_write_frequency(int frequency); INLINE int get_disk_write_frequency(void) const; - INLINE void enable_download(bool val); - INLINE bool is_download_enabled(void) const; - INLINE bool get_last_attempt_stalled(void) const; + INLINE int get_bytes_written(void) const; + INLINE float get_bytes_per_second(void) const; private: class DownloadStatus { public: - DownloadStatus(char *buffer, const string &event_name, int first_byte, - int last_byte, int total_bytes, bool partial_content, - uint id); + DownloadStatus(char *buffer, int first_byte, int last_byte, + int total_bytes, bool partial_content); void reset(void); public: @@ -81,55 +74,44 @@ private: char *_start; char *_next_in; int _bytes_in_buffer; - string _event_name; int _total_bytes_written; int _first_byte; int _last_byte; int _total_bytes; bool _partial_content; - uint _id; char *_buffer; }; - void init(); - int download(const string &file_name, Filename file_dest, - const string &event_name, int first_byte, - int last_byte, int total_bytes, bool partial_content, - bool sync, uint id); - virtual bool process_request(void); - bool parse_header(DownloadStatus &status); - bool write_to_disk(DownloadStatus &status); + INLINE void recompute_buffer(void); + bool connect_to_server(void); int safe_send(int socket, const char *data, int length, long timeout); - int safe_receive(int socket, DownloadStatus &status, int length, - long timeout, int &bytes); + int fast_receive(int socket, DownloadStatus *status, int rec_size); bool parse_http_response(const string &resp); - int attempt_read(int length, DownloadStatus &status, int &bytes_read); - - typedef TokenBoard DownloaderTokenBoard; - DownloaderTokenBoard *_token_board; + bool parse_header(DownloadStatus *status); + bool write_to_disk(DownloadStatus *status); +private: bool _connected; - -#ifdef HAVE_IPC - mutex _buffer_lock; -#endif - int _socket; - PT(Buffer) _buffer; - int _disk_write_frequency; - int _new_disk_write_frequency; - float _byte_rate; - float _new_byte_rate; - int _read_size; - bool _download_enabled; - ofstream _dest_stream; - int _disk_buffer_size; - bool _last_attempt_stalled; - bool _current_attempt_stalled; - string _server_name; struct sockaddr_in _sin; + + PT(Buffer) _buffer; + int _disk_write_frequency; + float _frequency; + float _byte_rate; + int _receive_size; + int _disk_buffer_size; + ofstream _dest_stream; + bool _recompute_buffer; + + DownloadStatus *_current_status; + bool _got_any_data; + + double _tlast; + double _tfirst; + ClockObject _clock; }; #include "downloader.I"