integrate thread_yield in appropriate places

This commit is contained in:
David Rose 2007-06-26 13:01:16 +00:00
parent ac0b6ea475
commit fe97acf7cb
30 changed files with 245 additions and 57 deletions

View File

@ -148,4 +148,18 @@ void (*global_mark_pointer)(void *ptr, size_t size, ReferenceCount *ref_ptr) = &
#else #else
#endif #endif // USE_MEMORY_*
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
static void
default_thread_yield() {
}
static void
default_thread_consider_yield() {
}
void (*global_thread_yield)() = default_thread_yield;
void (*global_thread_consider_yield)() = default_thread_consider_yield;
#endif // HAVE_THREADS && SIMPLE_THREADS

View File

@ -197,6 +197,28 @@ INLINE void operator delete[](void *ptr) {
#endif // REDEFINE_GLOBAL_OPERATOR_NEW #endif // REDEFINE_GLOBAL_OPERATOR_NEW
#endif // USE_MEMORY_NOWRAPPERS #endif // USE_MEMORY_NOWRAPPERS
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
// We need another forward-reference function to allow low-level code
// to cooperatively yield the timeslice, in SIMPLE_THREADS mode.
extern EXPCL_DTOOL void (*global_thread_yield)();
extern EXPCL_DTOOL void (*global_thread_consider_yield)();
INLINE void thread_yield() {
(*global_thread_yield)();
}
INLINE void thread_consider_yield() {
(*global_thread_consider_yield)();
}
#else
INLINE void thread_yield() {
}
INLINE void thread_consider_yield() {
}
#endif // HAVE_THREADS && SIMPLE_THREADS
#if defined(USE_TAU) && defined(WIN32) #if defined(USE_TAU) && defined(WIN32)
// Hack around tau's lack of DLL export declarations for Profiler class. // Hack around tau's lack of DLL export declarations for Profiler class.
extern EXPCL_DTOOL bool __tau_shutdown; extern EXPCL_DTOOL bool __tau_shutdown;

View File

@ -1556,6 +1556,7 @@ scan_directory(vector_string &contents) const {
} }
do { do {
thread_consider_yield();
string filename = find_data.cFileName; string filename = find_data.cFileName;
if (filename != "." && filename != "..") { if (filename != "." && filename != "..") {
contents.push_back(filename); contents.push_back(filename);
@ -1588,6 +1589,7 @@ scan_directory(vector_string &contents) const {
struct dirent *d; struct dirent *d;
d = readdir(root); d = readdir(root);
while (d != (struct dirent *)NULL) { while (d != (struct dirent *)NULL) {
thread_consider_yield();
if (d->d_name[0] != '.') { if (d->d_name[0] != '.') {
contents.push_back(d->d_name); contents.push_back(d->d_name);
} }

View File

@ -186,6 +186,7 @@ open_read(istream *source, bool owns_source, const string &password) {
_read_overflow_buffer = new unsigned char[_read_block_size]; _read_overflow_buffer = new unsigned char[_read_block_size];
_in_read_overflow_buffer = 0; _in_read_overflow_buffer = 0;
thread_consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -300,6 +301,7 @@ open_write(ostream *dest, bool owns_dest, const string &password) {
sw.append_data(iv, iv_length); sw.append_data(iv, iv_length);
_write_valid = true; _write_valid = true;
thread_consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -318,6 +320,7 @@ close_write() {
unsigned char *write_buffer = (unsigned char *)alloca(_write_block_size); unsigned char *write_buffer = (unsigned char *)alloca(_write_block_size);
int bytes_written = 0; int bytes_written = 0;
EVP_EncryptFinal(&_write_ctx, write_buffer, &bytes_written); EVP_EncryptFinal(&_write_ctx, write_buffer, &bytes_written);
thread_consider_yield();
_dest->write((const char *)write_buffer, bytes_written); _dest->write((const char *)write_buffer, bytes_written);
@ -463,6 +466,7 @@ read_chars(char *start, size_t length) {
_read_valid = false; _read_valid = false;
} }
} }
thread_consider_yield();
} while (bytes_read == 0); } while (bytes_read == 0);
@ -504,6 +508,7 @@ write_chars(const char *start, size_t length) {
prc_cat.error() prc_cat.error()
<< "Error encrypting stream.\n"; << "Error encrypting stream.\n";
} }
thread_consider_yield();
_dest->write((const char *)write_buffer, bytes_written); _dest->write((const char *)write_buffer, bytes_written);
} }
} }

View File

@ -756,6 +756,7 @@ read_callback(void *handle, void *buffer, unsigned int size_bytes,
istream *str = (istream *)handle; istream *str = (istream *)handle;
str->read((char *)buffer, size_bytes); str->read((char *)buffer, size_bytes);
(*bytes_read) = str->gcount(); (*bytes_read) = str->gcount();
thread_consider_yield();
if (str->eof()) { if (str->eof()) {
if ((*bytes_read) == 0) { if ((*bytes_read) == 0) {

View File

@ -170,6 +170,7 @@ underflow() {
// BIO_read might return -1 or -2 on eof or error, so we have to // BIO_read might return -1 or -2 on eof or error, so we have to
// allow for negative numbers. // allow for negative numbers.
int read_count = BIO_read(*_source, gptr(), buffer_size); int read_count = BIO_read(*_source, gptr(), buffer_size);
thread_consider_yield();
if (read_count != (int)num_bytes) { if (read_count != (int)num_bytes) {
// Oops, we didn't read what we thought we would. // Oops, we didn't read what we thought we would.
@ -239,6 +240,7 @@ write_chars(const char *start, size_t length) {
downloader_cat.spam() downloader_cat.spam()
<< "wrote " << write_count << " bytes.\n"; << "wrote " << write_count << " bytes.\n";
} }
thread_consider_yield();
while (write_count != (int)(length - wrote_so_far)) { while (write_count != (int)(length - wrote_so_far)) {
if (write_count <= 0) { if (write_count <= 0) {
_is_closed = !BIO_should_retry(*_source); _is_closed = !BIO_should_retry(*_source);
@ -275,6 +277,7 @@ write_chars(const char *start, size_t length) {
downloader_cat.spam() downloader_cat.spam()
<< "continued, wrote " << write_count << " bytes.\n"; << "continued, wrote " << write_count << " bytes.\n";
} }
thread_consider_yield();
} }
} }

View File

@ -666,7 +666,8 @@ send_extra_header(const string &key, const string &value) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
get_document(const DocumentSpec &url) { get_document(const DocumentSpec &url) {
begin_request(HTTPEnum::M_get, url, string(), false, 0, 0); begin_request(HTTPEnum::M_get, url, string(), false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -683,7 +684,8 @@ get_document(const DocumentSpec &url) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
get_subdocument(const DocumentSpec &url, size_t first_byte, size_t last_byte) { get_subdocument(const DocumentSpec &url, size_t first_byte, size_t last_byte) {
begin_request(HTTPEnum::M_get, url, string(), false, first_byte, last_byte); begin_request(HTTPEnum::M_get, url, string(), false, first_byte, last_byte);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -699,7 +701,8 @@ get_subdocument(const DocumentSpec &url, size_t first_byte, size_t last_byte) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
get_header(const DocumentSpec &url) { get_header(const DocumentSpec &url) {
begin_request(HTTPEnum::M_head, url, string(), false, 0, 0); begin_request(HTTPEnum::M_head, url, string(), false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -712,7 +715,8 @@ get_header(const DocumentSpec &url) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
post_form(const DocumentSpec &url, const string &body) { post_form(const DocumentSpec &url, const string &body) {
begin_request(HTTPEnum::M_post, url, body, false, 0, 0); begin_request(HTTPEnum::M_post, url, body, false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -725,7 +729,8 @@ post_form(const DocumentSpec &url, const string &body) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
put_document(const DocumentSpec &url, const string &body) { put_document(const DocumentSpec &url, const string &body) {
begin_request(HTTPEnum::M_put, url, body, false, 0, 0); begin_request(HTTPEnum::M_put, url, body, false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -737,7 +742,8 @@ put_document(const DocumentSpec &url, const string &body) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
delete_document(const DocumentSpec &url) { delete_document(const DocumentSpec &url) {
begin_request(HTTPEnum::M_delete, url, string(), false, 0, 0); begin_request(HTTPEnum::M_delete, url, string(), false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -751,7 +757,8 @@ delete_document(const DocumentSpec &url) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
get_trace(const DocumentSpec &url) { get_trace(const DocumentSpec &url) {
begin_request(HTTPEnum::M_trace, url, string(), false, 0, 0); begin_request(HTTPEnum::M_trace, url, string(), false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }
@ -770,7 +777,8 @@ get_trace(const DocumentSpec &url) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
connect_to(const DocumentSpec &url) { connect_to(const DocumentSpec &url) {
begin_request(HTTPEnum::M_connect, url, string(), false, 0, 0); begin_request(HTTPEnum::M_connect, url, string(), false, 0, 0);
run(); while (run()) {
}
return is_connection_ready(); return is_connection_ready();
} }
@ -784,7 +792,8 @@ connect_to(const DocumentSpec &url) {
INLINE bool HTTPChannel:: INLINE bool HTTPChannel::
get_options(const DocumentSpec &url) { get_options(const DocumentSpec &url) {
begin_request(HTTPEnum::M_options, url, string(), false, 0, 0); begin_request(HTTPEnum::M_options, url, string(), false, 0, 0);
run(); while (run()) {
}
return is_valid(); return is_valid();
} }

View File

@ -58,8 +58,16 @@ HTTPChannel(HTTPClient *client) :
_seconds_per_update = downloader_frequency; _seconds_per_update = downloader_frequency;
_max_updates_per_second = 1.0f / _seconds_per_update; _max_updates_per_second = 1.0f / _seconds_per_update;
_bytes_per_update = int(_max_bytes_per_second * _seconds_per_update); _bytes_per_update = int(_max_bytes_per_second * _seconds_per_update);
// _nonblocking is true if the socket is actually in non-blocking mode.
_nonblocking = false; _nonblocking = false;
// _wanted_nonblocking is true if the user specifically requested
// one of the non-blocking interfaces. It is false if the socket is
// only incidentally non-blocking (for instance, because
// SIMPLE_THREADS is on).
_wanted_nonblocking = false;
_want_ssl = false; _want_ssl = false;
_proxy_serves_document = false; _proxy_serves_document = false;
_proxy_tunnel_now = false; _proxy_tunnel_now = false;
@ -353,11 +361,12 @@ run() {
} }
if (_started_download) { if (_started_download) {
if (_nonblocking && _download_throttle) { if (_wanted_nonblocking && _download_throttle) {
double now = TrueClock::get_global_ptr()->get_short_time(); double now = TrueClock::get_global_ptr()->get_short_time();
double elapsed = now - _last_run_time; double elapsed = now - _last_run_time;
if (elapsed < _seconds_per_update) { if (elapsed < _seconds_per_update) {
// Come back later. // Come back later.
thread_yield();
return true; return true;
} }
int num_potential_updates = (int)(elapsed / _seconds_per_update); int num_potential_updates = (int)(elapsed / _seconds_per_update);
@ -370,16 +379,22 @@ run() {
<< _bytes_requested << "\n"; << _bytes_requested << "\n";
} }
} }
bool repeat_later;
switch (_download_dest) { switch (_download_dest) {
case DD_none: case DD_none:
return false; // We're done. repeat_later = false; // We're done.
case DD_file: case DD_file:
return run_download_to_file(); repeat_later = run_download_to_file();
case DD_ram: case DD_ram:
return run_download_to_ram(); repeat_later = run_download_to_ram();
} }
if (repeat_later) {
thread_yield();
}
return repeat_later;
} }
if (downloader_cat.is_spam()) { if (downloader_cat.is_spam()) {
@ -399,7 +414,7 @@ run() {
// reestablish the connection if it has been dropped. // reestablish the connection if it has been dropped.
if (_bio.is_null() && _state != S_try_next_proxy) { if (_bio.is_null() && _state != S_try_next_proxy) {
if (_connect_count > http_max_connect_count) { if (_connect_count > http_max_connect_count) {
// Too many connection attempts, just give up. We should // Too many connection attempts; just give up. We should
// never trigger this failsafe, since the code in each // never trigger this failsafe, since the code in each
// individual case has similar logic to prevent more than two // individual case has similar logic to prevent more than two
// consecutive lost connections. // consecutive lost connections.
@ -534,6 +549,7 @@ run() {
// We've reached our terminal state. // We've reached our terminal state.
return reached_done_state(); return reached_done_state();
} }
thread_consider_yield();
} while (!repeat_later || _bio.is_null()); } while (!repeat_later || _bio.is_null());
if (downloader_cat.is_spam()) { if (downloader_cat.is_spam()) {
@ -541,6 +557,8 @@ run() {
<< "later run(), _state = " << _state << "later run(), _state = " << _state
<< ", _done_state = " << _done_state << "\n"; << ", _done_state = " << _done_state << "\n";
} }
thread_yield();
return true; return true;
} }
@ -631,7 +649,7 @@ download_to_file(const Filename &filename, bool subdocument_resumes) {
_download_dest = DD_file; _download_dest = DD_file;
if (_nonblocking) { if (_wanted_nonblocking) {
// In nonblocking mode, we can't start the download yet; that will // In nonblocking mode, we can't start the download yet; that will
// be done later as run() is called. // be done later as run() is called.
return true; return true;
@ -643,7 +661,8 @@ download_to_file(const Filename &filename, bool subdocument_resumes) {
return false; return false;
} }
run(); while (run()) {
}
return is_download_complete(); return is_download_complete();
} }
@ -687,7 +706,7 @@ download_to_ram(Ramfile *ramfile, bool subdocument_resumes) {
_download_dest = DD_ram; _download_dest = DD_ram;
_subdocument_resumes = (subdocument_resumes && _first_byte_delivered != 0); _subdocument_resumes = (subdocument_resumes && _first_byte_delivered != 0);
if (_nonblocking) { if (_wanted_nonblocking) {
// In nonblocking mode, we can't start the download yet; that will // In nonblocking mode, we can't start the download yet; that will
// be done later as run() is called. // be done later as run() is called.
return true; return true;
@ -699,7 +718,8 @@ download_to_ram(Ramfile *ramfile, bool subdocument_resumes) {
return false; return false;
} }
run(); while (run()) {
}
return is_download_complete(); return is_download_complete();
} }
@ -2071,7 +2091,7 @@ bool HTTPChannel::
run_download_to_file() { run_download_to_file() {
nassertr(_body_stream != (ISocketStream *)NULL, false); nassertr(_body_stream != (ISocketStream *)NULL, false);
bool do_throttle = _nonblocking && _download_throttle; bool do_throttle = _wanted_nonblocking && _download_throttle;
static const size_t buffer_size = 1024; static const size_t buffer_size = 1024;
char buffer[buffer_size]; char buffer[buffer_size];
@ -2146,7 +2166,7 @@ run_download_to_ram() {
nassertr(_body_stream != (ISocketStream *)NULL, false); nassertr(_body_stream != (ISocketStream *)NULL, false);
nassertr(_download_to_ramfile != (Ramfile *)NULL, false); nassertr(_download_to_ramfile != (Ramfile *)NULL, false);
bool do_throttle = _nonblocking && _download_throttle; bool do_throttle = _wanted_nonblocking && _download_throttle;
static const size_t buffer_size = 1024; static const size_t buffer_size = 1024;
char buffer[buffer_size]; char buffer[buffer_size];
@ -2212,6 +2232,13 @@ begin_request(HTTPEnum::Method method, const DocumentSpec &url,
size_t first_byte, size_t last_byte) { size_t first_byte, size_t last_byte) {
reset_for_new_request(); reset_for_new_request();
_wanted_nonblocking = nonblocking;
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
// In the presence of SIMPLE_THREADS, we always use non-blocking
// I/O. We simulate blocking by yielding the thread.
nonblocking = true;
#endif
// Get the set of proxies that are appropriate for this URL. // Get the set of proxies that are appropriate for this URL.
_proxies.clear(); _proxies.clear();
_proxy_next_index = 0; _proxy_next_index = 0;

View File

@ -323,6 +323,7 @@ private:
double _seconds_per_update; double _seconds_per_update;
int _bytes_per_update; int _bytes_per_update;
bool _nonblocking; bool _nonblocking;
bool _wanted_nonblocking;
string _send_extra_headers; string _send_extra_headers;
DocumentSpec _document_spec; DocumentSpec _document_spec;

View File

@ -26,6 +26,7 @@
#include "config_egg.h" #include "config_egg.h"
#include "eggGroupNode.h" #include "eggGroupNode.h"
#include "dcast.h" #include "dcast.h"
#include "thread.h"
#include <stdlib.h> #include <stdlib.h>
@ -325,6 +326,8 @@ do_mesh() {
// Finally, do the longer strips. // Finally, do the longer strips.
mesh_list(_strips); mesh_list(_strips);
Thread::consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////

View File

@ -14,6 +14,7 @@
#include "indent.h" #include "indent.h"
#include "pnotify.h" #include "pnotify.h"
#include "pmutex.h" #include "pmutex.h"
#include "thread.h"
#include <math.h> #include <math.h>
@ -188,6 +189,7 @@ input_chars(char *buffer, int &result, int max_size) {
// End of file or I/O error. // End of file or I/O error.
result = 0; result = 0;
} }
Thread::consider_yield();
} }
#undef YY_INPUT #undef YY_INPUT
#define YY_INPUT(buffer, result, max_size) input_chars(buffer, result, max_size) #define YY_INPUT(buffer, result, max_size) input_chars(buffer, result, max_size)

View File

@ -92,6 +92,7 @@
#include "transformBlend.h" #include "transformBlend.h"
#include "sparseArray.h" #include "sparseArray.h"
#include "bitArray.h" #include "bitArray.h"
#include "thread.h"
#include <ctype.h> #include <ctype.h>
#include <algorithm> #include <algorithm>
@ -2321,6 +2322,7 @@ make_vertex_data(const EggRenderState *render_state,
(VertexPoolData::value_type(vpt, vertex_data)).second; (VertexPoolData::value_type(vpt, vertex_data)).second;
nassertr(inserted, vertex_data); nassertr(inserted, vertex_data);
Thread::consider_yield();
return vertex_data; return vertex_data;
} }

View File

@ -284,6 +284,7 @@ read_file(istream *in, string &result) {
in->read(buffer, buffer_size); in->read(buffer, buffer_size);
size_t count = in->gcount(); size_t count = in->gcount();
while (count != 0) { while (count != 0) {
thread_consider_yield();
result_vec.insert(result_vec.end(), buffer, buffer + count); result_vec.insert(result_vec.end(), buffer, buffer + count);
in->read(buffer, buffer_size); in->read(buffer, buffer_size);
count = in->gcount(); count = in->gcount();
@ -309,6 +310,7 @@ read_file(istream *in, string &result, size_t max_bytes) {
in->read(buffer, min(buffer_size, max_bytes)); in->read(buffer, min(buffer_size, max_bytes));
size_t count = in->gcount(); size_t count = in->gcount();
while (count != 0) { while (count != 0) {
thread_consider_yield();
nassertr(count <= max_bytes, false); nassertr(count <= max_bytes, false);
result_vec.insert(result_vec.end(), buffer, buffer + count); result_vec.insert(result_vec.end(), buffer, buffer + count);
max_bytes -= count; max_bytes -= count;

View File

@ -86,6 +86,7 @@ open_read(istream *source, bool owns_source) {
show_zlib_error("inflateInit", result, _z_source); show_zlib_error("inflateInit", result, _z_source);
close_read(); close_read();
} }
thread_consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -101,6 +102,7 @@ close_read() {
if (result < 0) { if (result < 0) {
show_zlib_error("inflateEnd", result, _z_source); show_zlib_error("inflateEnd", result, _z_source);
} }
thread_consider_yield();
if (_owns_source) { if (_owns_source) {
delete _source; delete _source;
@ -130,6 +132,7 @@ open_write(ostream *dest, bool owns_dest, int compression_level) {
show_zlib_error("deflateInit", result, _z_dest); show_zlib_error("deflateInit", result, _z_dest);
close_write(); close_write();
} }
thread_consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -148,6 +151,7 @@ close_write() {
if (result < 0) { if (result < 0) {
show_zlib_error("deflateEnd", result, _z_dest); show_zlib_error("deflateEnd", result, _z_dest);
} }
thread_consider_yield();
if (_owns_dest) { if (_owns_dest) {
delete _dest; delete _dest;
@ -260,6 +264,7 @@ read_chars(char *start, size_t length) {
_z_source.avail_in = read_count; _z_source.avail_in = read_count;
} }
int result = inflate(&_z_source, flush); int result = inflate(&_z_source, flush);
thread_consider_yield();
size_t bytes_read = length - _z_source.avail_out; size_t bytes_read = length - _z_source.avail_out;
if (result == Z_STREAM_END) { if (result == Z_STREAM_END) {
@ -302,6 +307,7 @@ write_chars(const char *start, size_t length, int flush) {
if (result < 0 && result != Z_BUF_ERROR) { if (result < 0 && result != Z_BUF_ERROR) {
show_zlib_error("deflate", result, _z_dest); show_zlib_error("deflate", result, _z_dest);
} }
thread_consider_yield();
while (_z_dest.avail_in != 0) { while (_z_dest.avail_in != 0) {
if (_z_dest.avail_out != compress_buffer_size) { if (_z_dest.avail_out != compress_buffer_size) {
@ -313,6 +319,7 @@ write_chars(const char *start, size_t length, int flush) {
if (result < 0) { if (result < 0) {
show_zlib_error("deflate", result, _z_dest); show_zlib_error("deflate", result, _z_dest);
} }
thread_consider_yield();
} }
while (_z_dest.avail_out != compress_buffer_size) { while (_z_dest.avail_out != compress_buffer_size) {
@ -323,6 +330,7 @@ write_chars(const char *start, size_t length, int flush) {
if (result < 0 && result != Z_BUF_ERROR) { if (result < 0 && result != Z_BUF_ERROR) {
show_zlib_error("deflate", result, _z_dest); show_zlib_error("deflate", result, _z_dest);
} }
thread_consider_yield();
} }
} }

View File

@ -93,7 +93,13 @@ VertexDataSaveFile(const Filename &directory, const string &prefix,
#else #else
// Posix case. // Posix case.
_fd = open(os_specific.c_str(), O_RDWR | O_CREAT, 0666); int flags = O_RDWR | O_CREAT;
#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS)
// In SIMPLE_THREADS mode, we use non-blocking I/O.
flags |= O_NONBLOCK;
#endif
_fd = open(os_specific.c_str(), flags, 0666);
if (_fd == -1) { if (_fd == -1) {
// Couldn't open the file: permissions problem or bad directory. // Couldn't open the file: permissions problem or bad directory.
if (!_filename.exists()) { if (!_filename.exists()) {
@ -210,11 +216,21 @@ write_data(const unsigned char *data, size_t size, bool compressed) {
return false; return false;
} }
ssize_t result = ::write(_fd, data, size); while (size > 0) {
if (result != (ssize_t)size) { ssize_t result = ::write(_fd, data, size);
gobj_cat.error() if (result == -1) {
<< "Error writing " << size << " bytes to save file. Disk full?\n"; if (errno == EAGAIN) {
return NULL; Thread::force_yield();
} else {
gobj_cat.error()
<< "Error writing " << size << " bytes to save file. Disk full?\n";
return NULL;
}
}
Thread::consider_yield();
data += result;
size -= result;
} }
#endif // _WIN32 #endif // _WIN32
@ -263,11 +279,21 @@ read_data(unsigned char *data, size_t size, VertexDataSaveBlock *block) {
<< "Error seeking to position " << block->get_start() << " in save file.\n"; << "Error seeking to position " << block->get_start() << " in save file.\n";
return false; return false;
} }
ssize_t result = read(_fd, data, size); while (size > 0) {
if (result != (ssize_t)size) { ssize_t result = read(_fd, data, size);
gobj_cat.error() if (result == -1) {
<< "Error reading " << size << " bytes from save file.\n"; if (errno == EAGAIN) {
return false; Thread::force_yield();
} else {
gobj_cat.error()
<< "Error reading " << size << " bytes from save file.\n";
return false;
}
}
Thread::consider_yield();
data += result;
size -= result;
} }
#endif // _WIN32 #endif // _WIN32

View File

@ -102,7 +102,7 @@ ConnectionReader::
ConnectionReader(ConnectionManager *manager, int num_threads) : ConnectionReader(ConnectionManager *manager, int num_threads) :
_manager(manager) _manager(manager)
{ {
#ifndef HAVE_THREADS #if !defined(HAVE_THREADS) || defined(SIMPLE_THREADS)
#ifndef NDEBUG #ifndef NDEBUG
if (num_threads != 0) { if (num_threads != 0) {
net_cat.error() net_cat.error()

View File

@ -61,11 +61,13 @@ ConnectionWriter::
ConnectionWriter(ConnectionManager *manager, int num_threads) : ConnectionWriter(ConnectionManager *manager, int num_threads) :
_manager(manager) _manager(manager)
{ {
#ifndef HAVE_THREADS #if !defined(HAVE_THREADS) || defined(SIMPLE_THREADS)
// Although this code is written to use thread-locking primitives // Although this code is written to use thread-locking primitives
// regardless of the definition of HAVE_THREADS, it is not safe to // regardless of the definition of HAVE_THREADS, it is not safe to
// spawn multiple threads when HAVE_THREADS is not true, since we // spawn multiple threads when HAVE_THREADS is not true, since we
// might be using a non-thread-safe malloc scheme. // might be using a non-thread-safe malloc scheme. Also, there is
// no point in using threads for this kind of I/O if SIMPLE_THREADS
// is defined.
#ifndef NDEBUG #ifndef NDEBUG
if (num_threads != 0) { if (num_threads != 0) {
net_cat.error() net_cat.error()

View File

@ -25,6 +25,7 @@
#include "plist.h" #include "plist.h"
#include "pmap.h" #include "pmap.h"
#include "geomNode.h" #include "geomNode.h"
#include "thread.h"
PStatCollector SceneGraphReducer::_flatten_collector("*:Flatten:flatten"); PStatCollector SceneGraphReducer::_flatten_collector("*:Flatten:flatten");
PStatCollector SceneGraphReducer::_apply_collector("*:Flatten:apply"); PStatCollector SceneGraphReducer::_apply_collector("*:Flatten:apply");
@ -771,7 +772,8 @@ r_collect_vertex_data(PandaNode *node, int collect_bits,
r_collect_vertex_data(children.get_child(i), collect_bits, transformer); r_collect_vertex_data(children.get_child(i), collect_bits, transformer);
} }
} }
Thread::consider_yield();
return num_created; return num_created;
} }
@ -840,6 +842,7 @@ r_unify(PandaNode *node, int max_indices) {
for (int i = 0; i < num_children; ++i) { for (int i = 0; i < num_children; ++i) {
r_unify(children.get_child(i), max_indices); r_unify(children.get_child(i), max_indices);
} }
Thread::consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////

View File

@ -41,6 +41,11 @@ ThreadSimpleManager() {
_current_thread = NULL; _current_thread = NULL;
_clock = TrueClock::get_global_ptr(); _clock = TrueClock::get_global_ptr();
_waiting_for_exit = NULL; _waiting_for_exit = NULL;
// Install these global pointers so very low-level code (code
// defined before the pipeline directory) can yield when necessary.
global_thread_yield = &Thread::force_yield;
global_thread_consider_yield = &Thread::consider_yield;
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -53,7 +58,9 @@ ThreadSimpleManager() {
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void ThreadSimpleManager:: void ThreadSimpleManager::
enqueue_ready(ThreadSimpleImpl *thread) { enqueue_ready(ThreadSimpleImpl *thread) {
_ready.push_back(thread); // We actually add it to _next_ready, so that we can tell when we
// have processed every thread in a given epoch.
_next_ready.push_back(thread);
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -353,9 +360,26 @@ choose_next_context() {
// Choose a new thread to execute. // Choose a new thread to execute.
while (_ready.empty()) { while (_ready.empty()) {
// No threads are ready. They must all be sleeping. if (!_next_ready.empty()) {
if (_sleeping.empty()) { // We've finished an epoch.
// No threads at all! _ready.swap(_next_ready);
system_yield();
} else if (!_sleeping.empty()) {
// All threads are sleeping.
double wait = _sleeping.front()->_start_time - now;
if (wait > 0.0) {
if (thread_cat.is_debug()) {
thread_cat.debug()
<< "Sleeping all threads " << wait << " seconds\n";
}
system_sleep(wait);
}
now = get_current_time();
wake_sleepers(now);
} else {
// No threads are ready!
if (!_blocked.empty()) { if (!_blocked.empty()) {
thread_cat.error() thread_cat.error()
<< "Deadlock! All threads blocked.\n"; << "Deadlock! All threads blocked.\n";
@ -372,24 +396,13 @@ choose_next_context() {
break; break;
} }
// No threads are queued anywhere. This is kind of an error, // No threads are queued anywhere. This is some kind of
// since normally the main thread, at least, should be queued // internal error, since normally the main thread, at least,
// somewhere. // should be queued somewhere.
thread_cat.error() thread_cat.error()
<< "All threads disappeared!\n"; << "All threads disappeared!\n";
exit(0); exit(0);
} }
double wait = _sleeping.front()->_start_time - now;
if (wait > 0.0) {
if (thread_cat.is_debug()) {
thread_cat.debug()
<< "Sleeping all threads " << wait << " seconds\n";
}
system_sleep(wait);
}
now = get_current_time();
wake_sleepers(now);
} }
_current_thread = _ready.front(); _current_thread = _ready.front();
@ -454,6 +467,25 @@ system_sleep(double seconds) {
#endif // WIN32 #endif // WIN32
} }
////////////////////////////////////////////////////////////////////
// Function: ThreadSimpleManager::system_yield
// Access: Private, Static
// Description: Calls the appropriate system function to yield
// the whole process to any other system processes.
////////////////////////////////////////////////////////////////////
void ThreadSimpleManager::
system_yield() {
#ifdef WIN32
Sleep(0);
#else
struct timespec rqtp;
rqtp.tv_sec = 0;
rqtp.tv_nsec = 0;
nanosleep(&rqtp, NULL);
#endif // WIN32
}
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: ThreadSimpleManager::report_deadlock // Function: ThreadSimpleManager::report_deadlock
// Access: Private // Access: Private

View File

@ -79,6 +79,7 @@ private:
void choose_next_context(); void choose_next_context();
void wake_sleepers(double now); void wake_sleepers(double now);
static void system_sleep(double seconds); static void system_sleep(double seconds);
static void system_yield();
void report_deadlock(); void report_deadlock();
// STL function object to sort the priority queue of sleeping threads. // STL function object to sort the priority queue of sleeping threads.
@ -98,6 +99,7 @@ private:
// FIFO list of ready threads. // FIFO list of ready threads.
FifoThreads _ready; FifoThreads _ready;
FifoThreads _next_ready;
typedef pmap<BlockerSimple *, FifoThreads> Blocked; typedef pmap<BlockerSimple *, FifoThreads> Blocked;
Blocked _blocked; Blocked _blocked;

View File

@ -18,6 +18,7 @@
#include "pnmReader.h" #include "pnmReader.h"
#include "virtualFileSystem.h" #include "virtualFileSystem.h"
#include "thread.h"
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: PNMReader::Destructor // Function: PNMReader::Destructor
@ -97,6 +98,7 @@ read_data(xel *array, xelval *alpha) {
int y; int y;
for (y = 0; y < _y_size; ++y) { for (y = 0; y < _y_size; ++y) {
if (!read_row(array + y * _x_size, alpha + y * _x_size, _x_size, _y_size)) { if (!read_row(array + y * _x_size, alpha + y * _x_size, _x_size, _y_size)) {
Thread::consider_yield();
return y; return y;
} }
} }
@ -129,6 +131,7 @@ read_data(xel *array, xelval *alpha) {
for (int yi = 0; yi < y_reduction; ++yi) { for (int yi = 0; yi < y_reduction; ++yi) {
// OK, read a row. This reads the original, full-size row. // OK, read a row. This reads the original, full-size row.
if (!read_row(orig_row_array, orig_row_alpha, _orig_x_size, _orig_y_size)) { if (!read_row(orig_row_array, orig_row_alpha, _orig_x_size, _orig_y_size)) {
Thread::consider_yield();
return y; return y;
} }

View File

@ -17,6 +17,7 @@
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
#include "pnmWriter.h" #include "pnmWriter.h"
#include "thread.h"
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: PNMWriter::Destructor // Function: PNMWriter::Destructor
@ -66,6 +67,7 @@ write_data(xel *array, xelval *alpha) {
int y; int y;
for (y = 0; y < _y_size; y++) { for (y = 0; y < _y_size; y++) {
if (!write_row(array + y * _x_size, alpha + y * _x_size)) { if (!write_row(array + y * _x_size, alpha + y * _x_size)) {
Thread::consider_yield();
return y; return y;
} }
} }

View File

@ -25,6 +25,7 @@
#include "bmp.h" #include "bmp.h"
#include "ppmcmap.h" #include "ppmcmap.h"
#include "pnmbitio.h" #include "pnmbitio.h"
#include "thread.h"
// Much code in this file is borrowed from Netpbm, specifically ppmtobmp.c. // Much code in this file is borrowed from Netpbm, specifically ppmtobmp.c.
/* /*
@ -629,6 +630,7 @@ write_data(xel *array, xelval *) {
BMPEncode(_file, classv, _x_size, _y_size, pixels, colors, cht, BMPEncode(_file, classv, _x_size, _y_size, pixels, colors, cht,
Red, Green, Blue); Red, Green, Blue);
} }
Thread::consider_yield();
return _y_size; return _y_size;
} }

View File

@ -21,7 +21,7 @@
#ifdef HAVE_JPEG #ifdef HAVE_JPEG
#include "config_pnmimagetypes.h" #include "config_pnmimagetypes.h"
#include "thread.h"
// //
// The following bit of code, for setting up jpeg_istream_src(), was // The following bit of code, for setting up jpeg_istream_src(), was
@ -126,6 +126,7 @@ fill_input_buffer (j_decompress_ptr cinfo)
src->infile->read((char *)src->buffer, INPUT_BUF_SIZE); src->infile->read((char *)src->buffer, INPUT_BUF_SIZE);
nbytes = src->infile->gcount(); nbytes = src->infile->gcount();
Thread::consider_yield();
if (nbytes <= 0) { if (nbytes <= 0) {
if (src->start_of_file) /* Treat empty input file as fatal error */ if (src->start_of_file) /* Treat empty input file as fatal error */

View File

@ -24,6 +24,7 @@
#include "pnmImage.h" #include "pnmImage.h"
#include "pnmWriter.h" #include "pnmWriter.h"
#include "thread.h"
// //
@ -122,6 +123,7 @@ empty_output_buffer (j_compress_ptr cinfo)
dest->pub.next_output_byte = dest->buffer; dest->pub.next_output_byte = dest->buffer;
dest->pub.free_in_buffer = OUTPUT_BUF_SIZE; dest->pub.free_in_buffer = OUTPUT_BUF_SIZE;
Thread::consider_yield();
return TRUE; return TRUE;
} }
@ -148,6 +150,7 @@ term_destination (j_compress_ptr cinfo)
ERREXIT(cinfo, JERR_FILE_WRITE); ERREXIT(cinfo, JERR_FILE_WRITE);
} }
dest->outfile->flush(); dest->outfile->flush();
Thread::consider_yield();
/* Make sure we wrote the output file OK */ /* Make sure we wrote the output file OK */
if (dest->outfile->fail()) if (dest->outfile->fail())
ERREXIT(cinfo, JERR_FILE_WRITE); ERREXIT(cinfo, JERR_FILE_WRITE);

View File

@ -24,6 +24,7 @@
#include "pnmFileTypeRegistry.h" #include "pnmFileTypeRegistry.h"
#include "bamReader.h" #include "bamReader.h"
#include "thread.h"
static const char * const extensions_png[] = { static const char * const extensions_png[] = {
"png" "png"
@ -453,6 +454,7 @@ png_read_data(png_structp png_ptr, png_bytep data, png_size_t length) {
<< "Didn't read enough bytes.\n"; << "Didn't read enough bytes.\n";
// Is there no way to indicate a read failure to libpng? // Is there no way to indicate a read failure to libpng?
} }
Thread::consider_yield();
} }
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
@ -774,6 +776,7 @@ write_data(xel *array, xelval *alpha_data) {
nassertr(dest <= row + row_byte_length, yi); nassertr(dest <= row + row_byte_length, yi);
png_write_row(_png, row); png_write_row(_png, row);
Thread::consider_yield();
} }
delete[] row; delete[] row;
@ -831,7 +834,7 @@ make_png_bit_depth(int bit_depth) {
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: PNMFileTypePNG::Writer::png_write_data // Function: PNMFileTypePNG::Writer::png_write_data
// Access: Private, Static // Access: Private, Static
// Description: A callback handler that PNG uses to write data from // Description: A callback handler that PNG uses to write data to
// the iostream. // the iostream.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void PNMFileTypePNG::Writer:: void PNMFileTypePNG::Writer::
@ -848,7 +851,7 @@ png_write_data(png_structp png_ptr, png_bytep data, png_size_t length) {
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: PNMFileTypePNG::Writer::png_flush_data // Function: PNMFileTypePNG::Writer::png_flush_data
// Access: Private, Static // Access: Private, Static
// Description: A callback handler that PNG uses to write data from // Description: A callback handler that PNG uses to write data to
// the iostream. // the iostream.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void PNMFileTypePNG::Writer:: void PNMFileTypePNG::Writer::

View File

@ -484,6 +484,7 @@ read_bytes(istream *ifp,
readerr(ifp); readerr(ifp);
memset(buf+r, 0, n-r); memset(buf+r, 0, n-r);
} }
Thread::consider_yield();
} }

View File

@ -93,6 +93,7 @@ static tsize_t
istream_read(thandle_t fd, tdata_t buf, tsize_t size) { istream_read(thandle_t fd, tdata_t buf, tsize_t size) {
istream *in = (istream *)fd; istream *in = (istream *)fd;
in->read((char *)buf, size); in->read((char *)buf, size);
Thread::consider_yield();
return in->gcount(); return in->gcount();
} }
@ -100,6 +101,7 @@ static tsize_t
ostream_write(thandle_t fd, tdata_t buf, tsize_t size) { ostream_write(thandle_t fd, tdata_t buf, tsize_t size) {
ostream *out = (ostream *)fd; ostream *out = (ostream *)fd;
out->write((char *)buf, size); out->write((char *)buf, size);
Thread::consider_yield();
return out->fail() ? (tsize_t)0 : size; return out->fail() ? (tsize_t)0 : size;
} }

View File

@ -24,6 +24,7 @@
#include "config_express.h" #include "config_express.h"
#include "virtualFileSystem.h" #include "virtualFileSystem.h"
#include "streamReader.h" #include "streamReader.h"
#include "thread.h"
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: DatagramInputFile::open // Function: DatagramInputFile::open
@ -112,6 +113,7 @@ read_header(string &header, size_t num_bytes) {
} }
header = string(buffer, num_bytes); header = string(buffer, num_bytes);
Thread::consider_yield();
return true; return true;
} }
@ -172,6 +174,7 @@ get_datagram(Datagram &data) {
data = Datagram(buffer, num_bytes); data = Datagram(buffer, num_bytes);
} }
Thread::consider_yield();
return true; return true;
} }

View File

@ -101,6 +101,7 @@ write_header(const string &header) {
nassertr(!_wrote_first_datagram, false); nassertr(!_wrote_first_datagram, false);
_out->write(header.data(), header.size()); _out->write(header.data(), header.size());
thread_consider_yield();
return !_out->fail(); return !_out->fail();
} }
@ -121,6 +122,7 @@ put_datagram(const Datagram &data) {
// Now, write the datagram itself. // Now, write the datagram itself.
_out->write((const char *)data.get_data(), data.get_length()); _out->write((const char *)data.get_data(), data.get_length());
thread_consider_yield();
return !_out->fail(); return !_out->fail();
} }