diff --git a/dtool/src/dtoolbase/dtoolbase.cxx b/dtool/src/dtoolbase/dtoolbase.cxx index 8c7c7e7eab..54494d06b5 100644 --- a/dtool/src/dtoolbase/dtoolbase.cxx +++ b/dtool/src/dtoolbase/dtoolbase.cxx @@ -148,4 +148,18 @@ void (*global_mark_pointer)(void *ptr, size_t size, ReferenceCount *ref_ptr) = & #else -#endif +#endif // USE_MEMORY_* + + +#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) + +static void +default_thread_yield() { +} +static void +default_thread_consider_yield() { +} +void (*global_thread_yield)() = default_thread_yield; +void (*global_thread_consider_yield)() = default_thread_consider_yield; + +#endif // HAVE_THREADS && SIMPLE_THREADS diff --git a/dtool/src/dtoolbase/dtoolbase_cc.h b/dtool/src/dtoolbase/dtoolbase_cc.h index 303a15b360..8d23ee4d34 100644 --- a/dtool/src/dtoolbase/dtoolbase_cc.h +++ b/dtool/src/dtoolbase/dtoolbase_cc.h @@ -197,6 +197,28 @@ INLINE void operator delete[](void *ptr) { #endif // REDEFINE_GLOBAL_OPERATOR_NEW #endif // USE_MEMORY_NOWRAPPERS +#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) +// We need another forward-reference function to allow low-level code +// to cooperatively yield the timeslice, in SIMPLE_THREADS mode. +extern EXPCL_DTOOL void (*global_thread_yield)(); +extern EXPCL_DTOOL void (*global_thread_consider_yield)(); + +INLINE void thread_yield() { + (*global_thread_yield)(); +} +INLINE void thread_consider_yield() { + (*global_thread_consider_yield)(); +} + +#else + +INLINE void thread_yield() { +} +INLINE void thread_consider_yield() { +} + +#endif // HAVE_THREADS && SIMPLE_THREADS + #if defined(USE_TAU) && defined(WIN32) // Hack around tau's lack of DLL export declarations for Profiler class. extern EXPCL_DTOOL bool __tau_shutdown; diff --git a/dtool/src/dtoolutil/filename.cxx b/dtool/src/dtoolutil/filename.cxx index 017a8e5eac..c2464d7d5a 100644 --- a/dtool/src/dtoolutil/filename.cxx +++ b/dtool/src/dtoolutil/filename.cxx @@ -1556,6 +1556,7 @@ scan_directory(vector_string &contents) const { } do { + thread_consider_yield(); string filename = find_data.cFileName; if (filename != "." && filename != "..") { contents.push_back(filename); @@ -1588,6 +1589,7 @@ scan_directory(vector_string &contents) const { struct dirent *d; d = readdir(root); while (d != (struct dirent *)NULL) { + thread_consider_yield(); if (d->d_name[0] != '.') { contents.push_back(d->d_name); } diff --git a/dtool/src/prc/encryptStreamBuf.cxx b/dtool/src/prc/encryptStreamBuf.cxx index 1d1638e973..9ffac25ed1 100644 --- a/dtool/src/prc/encryptStreamBuf.cxx +++ b/dtool/src/prc/encryptStreamBuf.cxx @@ -186,6 +186,7 @@ open_read(istream *source, bool owns_source, const string &password) { _read_overflow_buffer = new unsigned char[_read_block_size]; _in_read_overflow_buffer = 0; + thread_consider_yield(); } //////////////////////////////////////////////////////////////////// @@ -300,6 +301,7 @@ open_write(ostream *dest, bool owns_dest, const string &password) { sw.append_data(iv, iv_length); _write_valid = true; + thread_consider_yield(); } //////////////////////////////////////////////////////////////////// @@ -318,6 +320,7 @@ close_write() { unsigned char *write_buffer = (unsigned char *)alloca(_write_block_size); int bytes_written = 0; EVP_EncryptFinal(&_write_ctx, write_buffer, &bytes_written); + thread_consider_yield(); _dest->write((const char *)write_buffer, bytes_written); @@ -463,6 +466,7 @@ read_chars(char *start, size_t length) { _read_valid = false; } } + thread_consider_yield(); } while (bytes_read == 0); @@ -504,6 +508,7 @@ write_chars(const char *start, size_t length) { prc_cat.error() << "Error encrypting stream.\n"; } + thread_consider_yield(); _dest->write((const char *)write_buffer, bytes_written); } } diff --git a/panda/src/audiotraits/fmodAudioManager.cxx b/panda/src/audiotraits/fmodAudioManager.cxx index b45947b608..f977352388 100644 --- a/panda/src/audiotraits/fmodAudioManager.cxx +++ b/panda/src/audiotraits/fmodAudioManager.cxx @@ -756,6 +756,7 @@ read_callback(void *handle, void *buffer, unsigned int size_bytes, istream *str = (istream *)handle; str->read((char *)buffer, size_bytes); (*bytes_read) = str->gcount(); + thread_consider_yield(); if (str->eof()) { if ((*bytes_read) == 0) { diff --git a/panda/src/downloader/bioStreamBuf.cxx b/panda/src/downloader/bioStreamBuf.cxx index 2c907fdb16..d88ed96064 100644 --- a/panda/src/downloader/bioStreamBuf.cxx +++ b/panda/src/downloader/bioStreamBuf.cxx @@ -170,6 +170,7 @@ underflow() { // BIO_read might return -1 or -2 on eof or error, so we have to // allow for negative numbers. int read_count = BIO_read(*_source, gptr(), buffer_size); + thread_consider_yield(); if (read_count != (int)num_bytes) { // Oops, we didn't read what we thought we would. @@ -239,6 +240,7 @@ write_chars(const char *start, size_t length) { downloader_cat.spam() << "wrote " << write_count << " bytes.\n"; } + thread_consider_yield(); while (write_count != (int)(length - wrote_so_far)) { if (write_count <= 0) { _is_closed = !BIO_should_retry(*_source); @@ -275,6 +277,7 @@ write_chars(const char *start, size_t length) { downloader_cat.spam() << "continued, wrote " << write_count << " bytes.\n"; } + thread_consider_yield(); } } diff --git a/panda/src/downloader/httpChannel.I b/panda/src/downloader/httpChannel.I index ce26171a0c..ae87609ce9 100644 --- a/panda/src/downloader/httpChannel.I +++ b/panda/src/downloader/httpChannel.I @@ -666,7 +666,8 @@ send_extra_header(const string &key, const string &value) { INLINE bool HTTPChannel:: get_document(const DocumentSpec &url) { begin_request(HTTPEnum::M_get, url, string(), false, 0, 0); - run(); + while (run()) { + } return is_valid(); } @@ -683,7 +684,8 @@ get_document(const DocumentSpec &url) { INLINE bool HTTPChannel:: get_subdocument(const DocumentSpec &url, size_t first_byte, size_t last_byte) { begin_request(HTTPEnum::M_get, url, string(), false, first_byte, last_byte); - run(); + while (run()) { + } return is_valid(); } @@ -699,7 +701,8 @@ get_subdocument(const DocumentSpec &url, size_t first_byte, size_t last_byte) { INLINE bool HTTPChannel:: get_header(const DocumentSpec &url) { begin_request(HTTPEnum::M_head, url, string(), false, 0, 0); - run(); + while (run()) { + } return is_valid(); } @@ -712,7 +715,8 @@ get_header(const DocumentSpec &url) { INLINE bool HTTPChannel:: post_form(const DocumentSpec &url, const string &body) { begin_request(HTTPEnum::M_post, url, body, false, 0, 0); - run(); + while (run()) { + } return is_valid(); } @@ -725,7 +729,8 @@ post_form(const DocumentSpec &url, const string &body) { INLINE bool HTTPChannel:: put_document(const DocumentSpec &url, const string &body) { begin_request(HTTPEnum::M_put, url, body, false, 0, 0); - run(); + while (run()) { + } return is_valid(); } @@ -737,7 +742,8 @@ put_document(const DocumentSpec &url, const string &body) { INLINE bool HTTPChannel:: delete_document(const DocumentSpec &url) { begin_request(HTTPEnum::M_delete, url, string(), false, 0, 0); - run(); + while (run()) { + } return is_valid(); } @@ -751,7 +757,8 @@ delete_document(const DocumentSpec &url) { INLINE bool HTTPChannel:: get_trace(const DocumentSpec &url) { begin_request(HTTPEnum::M_trace, url, string(), false, 0, 0); - run(); + while (run()) { + } return is_valid(); } @@ -770,7 +777,8 @@ get_trace(const DocumentSpec &url) { INLINE bool HTTPChannel:: connect_to(const DocumentSpec &url) { begin_request(HTTPEnum::M_connect, url, string(), false, 0, 0); - run(); + while (run()) { + } return is_connection_ready(); } @@ -784,7 +792,8 @@ connect_to(const DocumentSpec &url) { INLINE bool HTTPChannel:: get_options(const DocumentSpec &url) { begin_request(HTTPEnum::M_options, url, string(), false, 0, 0); - run(); + while (run()) { + } return is_valid(); } diff --git a/panda/src/downloader/httpChannel.cxx b/panda/src/downloader/httpChannel.cxx index c111f2cee5..bb8c949434 100644 --- a/panda/src/downloader/httpChannel.cxx +++ b/panda/src/downloader/httpChannel.cxx @@ -58,8 +58,16 @@ HTTPChannel(HTTPClient *client) : _seconds_per_update = downloader_frequency; _max_updates_per_second = 1.0f / _seconds_per_update; _bytes_per_update = int(_max_bytes_per_second * _seconds_per_update); + + // _nonblocking is true if the socket is actually in non-blocking mode. _nonblocking = false; + // _wanted_nonblocking is true if the user specifically requested + // one of the non-blocking interfaces. It is false if the socket is + // only incidentally non-blocking (for instance, because + // SIMPLE_THREADS is on). + _wanted_nonblocking = false; + _want_ssl = false; _proxy_serves_document = false; _proxy_tunnel_now = false; @@ -353,11 +361,12 @@ run() { } if (_started_download) { - if (_nonblocking && _download_throttle) { + if (_wanted_nonblocking && _download_throttle) { double now = TrueClock::get_global_ptr()->get_short_time(); double elapsed = now - _last_run_time; if (elapsed < _seconds_per_update) { // Come back later. + thread_yield(); return true; } int num_potential_updates = (int)(elapsed / _seconds_per_update); @@ -370,16 +379,22 @@ run() { << _bytes_requested << "\n"; } } + + bool repeat_later; switch (_download_dest) { case DD_none: - return false; // We're done. + repeat_later = false; // We're done. case DD_file: - return run_download_to_file(); + repeat_later = run_download_to_file(); case DD_ram: - return run_download_to_ram(); + repeat_later = run_download_to_ram(); } + if (repeat_later) { + thread_yield(); + } + return repeat_later; } if (downloader_cat.is_spam()) { @@ -399,7 +414,7 @@ run() { // reestablish the connection if it has been dropped. if (_bio.is_null() && _state != S_try_next_proxy) { if (_connect_count > http_max_connect_count) { - // Too many connection attempts, just give up. We should + // Too many connection attempts; just give up. We should // never trigger this failsafe, since the code in each // individual case has similar logic to prevent more than two // consecutive lost connections. @@ -534,6 +549,7 @@ run() { // We've reached our terminal state. return reached_done_state(); } + thread_consider_yield(); } while (!repeat_later || _bio.is_null()); if (downloader_cat.is_spam()) { @@ -541,6 +557,8 @@ run() { << "later run(), _state = " << _state << ", _done_state = " << _done_state << "\n"; } + + thread_yield(); return true; } @@ -631,7 +649,7 @@ download_to_file(const Filename &filename, bool subdocument_resumes) { _download_dest = DD_file; - if (_nonblocking) { + if (_wanted_nonblocking) { // In nonblocking mode, we can't start the download yet; that will // be done later as run() is called. return true; @@ -643,7 +661,8 @@ download_to_file(const Filename &filename, bool subdocument_resumes) { return false; } - run(); + while (run()) { + } return is_download_complete(); } @@ -687,7 +706,7 @@ download_to_ram(Ramfile *ramfile, bool subdocument_resumes) { _download_dest = DD_ram; _subdocument_resumes = (subdocument_resumes && _first_byte_delivered != 0); - if (_nonblocking) { + if (_wanted_nonblocking) { // In nonblocking mode, we can't start the download yet; that will // be done later as run() is called. return true; @@ -699,7 +718,8 @@ download_to_ram(Ramfile *ramfile, bool subdocument_resumes) { return false; } - run(); + while (run()) { + } return is_download_complete(); } @@ -2071,7 +2091,7 @@ bool HTTPChannel:: run_download_to_file() { nassertr(_body_stream != (ISocketStream *)NULL, false); - bool do_throttle = _nonblocking && _download_throttle; + bool do_throttle = _wanted_nonblocking && _download_throttle; static const size_t buffer_size = 1024; char buffer[buffer_size]; @@ -2146,7 +2166,7 @@ run_download_to_ram() { nassertr(_body_stream != (ISocketStream *)NULL, false); nassertr(_download_to_ramfile != (Ramfile *)NULL, false); - bool do_throttle = _nonblocking && _download_throttle; + bool do_throttle = _wanted_nonblocking && _download_throttle; static const size_t buffer_size = 1024; char buffer[buffer_size]; @@ -2212,6 +2232,13 @@ begin_request(HTTPEnum::Method method, const DocumentSpec &url, size_t first_byte, size_t last_byte) { reset_for_new_request(); + _wanted_nonblocking = nonblocking; +#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) + // In the presence of SIMPLE_THREADS, we always use non-blocking + // I/O. We simulate blocking by yielding the thread. + nonblocking = true; +#endif + // Get the set of proxies that are appropriate for this URL. _proxies.clear(); _proxy_next_index = 0; diff --git a/panda/src/downloader/httpChannel.h b/panda/src/downloader/httpChannel.h index 396bd3fe91..32f2690afb 100644 --- a/panda/src/downloader/httpChannel.h +++ b/panda/src/downloader/httpChannel.h @@ -323,6 +323,7 @@ private: double _seconds_per_update; int _bytes_per_update; bool _nonblocking; + bool _wanted_nonblocking; string _send_extra_headers; DocumentSpec _document_spec; diff --git a/panda/src/egg/eggMesher.cxx b/panda/src/egg/eggMesher.cxx index 2360be8d8e..bdb007b395 100644 --- a/panda/src/egg/eggMesher.cxx +++ b/panda/src/egg/eggMesher.cxx @@ -26,6 +26,7 @@ #include "config_egg.h" #include "eggGroupNode.h" #include "dcast.h" +#include "thread.h" #include @@ -325,6 +326,8 @@ do_mesh() { // Finally, do the longer strips. mesh_list(_strips); + + Thread::consider_yield(); } //////////////////////////////////////////////////////////////////// diff --git a/panda/src/egg/lexer.lxx b/panda/src/egg/lexer.lxx index 515ddce0da..b902820769 100644 --- a/panda/src/egg/lexer.lxx +++ b/panda/src/egg/lexer.lxx @@ -14,6 +14,7 @@ #include "indent.h" #include "pnotify.h" #include "pmutex.h" +#include "thread.h" #include @@ -188,6 +189,7 @@ input_chars(char *buffer, int &result, int max_size) { // End of file or I/O error. result = 0; } + Thread::consider_yield(); } #undef YY_INPUT #define YY_INPUT(buffer, result, max_size) input_chars(buffer, result, max_size) diff --git a/panda/src/egg2pg/eggLoader.cxx b/panda/src/egg2pg/eggLoader.cxx index 648e797e09..8f6bec27b7 100644 --- a/panda/src/egg2pg/eggLoader.cxx +++ b/panda/src/egg2pg/eggLoader.cxx @@ -92,6 +92,7 @@ #include "transformBlend.h" #include "sparseArray.h" #include "bitArray.h" +#include "thread.h" #include #include @@ -2321,6 +2322,7 @@ make_vertex_data(const EggRenderState *render_state, (VertexPoolData::value_type(vpt, vertex_data)).second; nassertr(inserted, vertex_data); + Thread::consider_yield(); return vertex_data; } diff --git a/panda/src/express/virtualFile.cxx b/panda/src/express/virtualFile.cxx index 10c8e22870..b65bf7d41c 100644 --- a/panda/src/express/virtualFile.cxx +++ b/panda/src/express/virtualFile.cxx @@ -284,6 +284,7 @@ read_file(istream *in, string &result) { in->read(buffer, buffer_size); size_t count = in->gcount(); while (count != 0) { + thread_consider_yield(); result_vec.insert(result_vec.end(), buffer, buffer + count); in->read(buffer, buffer_size); count = in->gcount(); @@ -309,6 +310,7 @@ read_file(istream *in, string &result, size_t max_bytes) { in->read(buffer, min(buffer_size, max_bytes)); size_t count = in->gcount(); while (count != 0) { + thread_consider_yield(); nassertr(count <= max_bytes, false); result_vec.insert(result_vec.end(), buffer, buffer + count); max_bytes -= count; diff --git a/panda/src/express/zStreamBuf.cxx b/panda/src/express/zStreamBuf.cxx index 778cd770ba..89bffab03b 100644 --- a/panda/src/express/zStreamBuf.cxx +++ b/panda/src/express/zStreamBuf.cxx @@ -86,6 +86,7 @@ open_read(istream *source, bool owns_source) { show_zlib_error("inflateInit", result, _z_source); close_read(); } + thread_consider_yield(); } //////////////////////////////////////////////////////////////////// @@ -101,6 +102,7 @@ close_read() { if (result < 0) { show_zlib_error("inflateEnd", result, _z_source); } + thread_consider_yield(); if (_owns_source) { delete _source; @@ -130,6 +132,7 @@ open_write(ostream *dest, bool owns_dest, int compression_level) { show_zlib_error("deflateInit", result, _z_dest); close_write(); } + thread_consider_yield(); } //////////////////////////////////////////////////////////////////// @@ -148,6 +151,7 @@ close_write() { if (result < 0) { show_zlib_error("deflateEnd", result, _z_dest); } + thread_consider_yield(); if (_owns_dest) { delete _dest; @@ -260,6 +264,7 @@ read_chars(char *start, size_t length) { _z_source.avail_in = read_count; } int result = inflate(&_z_source, flush); + thread_consider_yield(); size_t bytes_read = length - _z_source.avail_out; if (result == Z_STREAM_END) { @@ -302,6 +307,7 @@ write_chars(const char *start, size_t length, int flush) { if (result < 0 && result != Z_BUF_ERROR) { show_zlib_error("deflate", result, _z_dest); } + thread_consider_yield(); while (_z_dest.avail_in != 0) { if (_z_dest.avail_out != compress_buffer_size) { @@ -313,6 +319,7 @@ write_chars(const char *start, size_t length, int flush) { if (result < 0) { show_zlib_error("deflate", result, _z_dest); } + thread_consider_yield(); } while (_z_dest.avail_out != compress_buffer_size) { @@ -323,6 +330,7 @@ write_chars(const char *start, size_t length, int flush) { if (result < 0 && result != Z_BUF_ERROR) { show_zlib_error("deflate", result, _z_dest); } + thread_consider_yield(); } } diff --git a/panda/src/gobj/vertexDataSaveFile.cxx b/panda/src/gobj/vertexDataSaveFile.cxx index 148a58f110..f3d3b6428d 100644 --- a/panda/src/gobj/vertexDataSaveFile.cxx +++ b/panda/src/gobj/vertexDataSaveFile.cxx @@ -93,7 +93,13 @@ VertexDataSaveFile(const Filename &directory, const string &prefix, #else // Posix case. - _fd = open(os_specific.c_str(), O_RDWR | O_CREAT, 0666); + int flags = O_RDWR | O_CREAT; +#if defined(HAVE_THREADS) && defined(SIMPLE_THREADS) + // In SIMPLE_THREADS mode, we use non-blocking I/O. + flags |= O_NONBLOCK; +#endif + + _fd = open(os_specific.c_str(), flags, 0666); if (_fd == -1) { // Couldn't open the file: permissions problem or bad directory. if (!_filename.exists()) { @@ -210,11 +216,21 @@ write_data(const unsigned char *data, size_t size, bool compressed) { return false; } - ssize_t result = ::write(_fd, data, size); - if (result != (ssize_t)size) { - gobj_cat.error() - << "Error writing " << size << " bytes to save file. Disk full?\n"; - return NULL; + while (size > 0) { + ssize_t result = ::write(_fd, data, size); + if (result == -1) { + if (errno == EAGAIN) { + Thread::force_yield(); + } else { + gobj_cat.error() + << "Error writing " << size << " bytes to save file. Disk full?\n"; + return NULL; + } + } + + Thread::consider_yield(); + data += result; + size -= result; } #endif // _WIN32 @@ -263,11 +279,21 @@ read_data(unsigned char *data, size_t size, VertexDataSaveBlock *block) { << "Error seeking to position " << block->get_start() << " in save file.\n"; return false; } - ssize_t result = read(_fd, data, size); - if (result != (ssize_t)size) { - gobj_cat.error() - << "Error reading " << size << " bytes from save file.\n"; - return false; + while (size > 0) { + ssize_t result = read(_fd, data, size); + if (result == -1) { + if (errno == EAGAIN) { + Thread::force_yield(); + } else { + gobj_cat.error() + << "Error reading " << size << " bytes from save file.\n"; + return false; + } + } + + Thread::consider_yield(); + data += result; + size -= result; } #endif // _WIN32 diff --git a/panda/src/net/connectionReader.cxx b/panda/src/net/connectionReader.cxx index 74490b08ba..57cb2b0bac 100644 --- a/panda/src/net/connectionReader.cxx +++ b/panda/src/net/connectionReader.cxx @@ -102,7 +102,7 @@ ConnectionReader:: ConnectionReader(ConnectionManager *manager, int num_threads) : _manager(manager) { -#ifndef HAVE_THREADS +#if !defined(HAVE_THREADS) || defined(SIMPLE_THREADS) #ifndef NDEBUG if (num_threads != 0) { net_cat.error() diff --git a/panda/src/net/connectionWriter.cxx b/panda/src/net/connectionWriter.cxx index feb24a3342..c142b4c309 100644 --- a/panda/src/net/connectionWriter.cxx +++ b/panda/src/net/connectionWriter.cxx @@ -61,11 +61,13 @@ ConnectionWriter:: ConnectionWriter(ConnectionManager *manager, int num_threads) : _manager(manager) { -#ifndef HAVE_THREADS +#if !defined(HAVE_THREADS) || defined(SIMPLE_THREADS) // Although this code is written to use thread-locking primitives // regardless of the definition of HAVE_THREADS, it is not safe to // spawn multiple threads when HAVE_THREADS is not true, since we - // might be using a non-thread-safe malloc scheme. + // might be using a non-thread-safe malloc scheme. Also, there is + // no point in using threads for this kind of I/O if SIMPLE_THREADS + // is defined. #ifndef NDEBUG if (num_threads != 0) { net_cat.error() diff --git a/panda/src/pgraph/sceneGraphReducer.cxx b/panda/src/pgraph/sceneGraphReducer.cxx index 3205b1576b..cee6a428d6 100644 --- a/panda/src/pgraph/sceneGraphReducer.cxx +++ b/panda/src/pgraph/sceneGraphReducer.cxx @@ -25,6 +25,7 @@ #include "plist.h" #include "pmap.h" #include "geomNode.h" +#include "thread.h" PStatCollector SceneGraphReducer::_flatten_collector("*:Flatten:flatten"); PStatCollector SceneGraphReducer::_apply_collector("*:Flatten:apply"); @@ -771,7 +772,8 @@ r_collect_vertex_data(PandaNode *node, int collect_bits, r_collect_vertex_data(children.get_child(i), collect_bits, transformer); } } - + + Thread::consider_yield(); return num_created; } @@ -840,6 +842,7 @@ r_unify(PandaNode *node, int max_indices) { for (int i = 0; i < num_children; ++i) { r_unify(children.get_child(i), max_indices); } + Thread::consider_yield(); } //////////////////////////////////////////////////////////////////// diff --git a/panda/src/pipeline/threadSimpleManager.cxx b/panda/src/pipeline/threadSimpleManager.cxx index 9027a695e0..8b7bec8954 100644 --- a/panda/src/pipeline/threadSimpleManager.cxx +++ b/panda/src/pipeline/threadSimpleManager.cxx @@ -41,6 +41,11 @@ ThreadSimpleManager() { _current_thread = NULL; _clock = TrueClock::get_global_ptr(); _waiting_for_exit = NULL; + + // Install these global pointers so very low-level code (code + // defined before the pipeline directory) can yield when necessary. + global_thread_yield = &Thread::force_yield; + global_thread_consider_yield = &Thread::consider_yield; } //////////////////////////////////////////////////////////////////// @@ -53,7 +58,9 @@ ThreadSimpleManager() { //////////////////////////////////////////////////////////////////// void ThreadSimpleManager:: enqueue_ready(ThreadSimpleImpl *thread) { - _ready.push_back(thread); + // We actually add it to _next_ready, so that we can tell when we + // have processed every thread in a given epoch. + _next_ready.push_back(thread); } //////////////////////////////////////////////////////////////////// @@ -353,9 +360,26 @@ choose_next_context() { // Choose a new thread to execute. while (_ready.empty()) { - // No threads are ready. They must all be sleeping. - if (_sleeping.empty()) { - // No threads at all! + if (!_next_ready.empty()) { + // We've finished an epoch. + _ready.swap(_next_ready); + system_yield(); + + } else if (!_sleeping.empty()) { + // All threads are sleeping. + double wait = _sleeping.front()->_start_time - now; + if (wait > 0.0) { + if (thread_cat.is_debug()) { + thread_cat.debug() + << "Sleeping all threads " << wait << " seconds\n"; + } + system_sleep(wait); + } + now = get_current_time(); + wake_sleepers(now); + + } else { + // No threads are ready! if (!_blocked.empty()) { thread_cat.error() << "Deadlock! All threads blocked.\n"; @@ -372,24 +396,13 @@ choose_next_context() { break; } - // No threads are queued anywhere. This is kind of an error, - // since normally the main thread, at least, should be queued - // somewhere. + // No threads are queued anywhere. This is some kind of + // internal error, since normally the main thread, at least, + // should be queued somewhere. thread_cat.error() << "All threads disappeared!\n"; exit(0); } - - double wait = _sleeping.front()->_start_time - now; - if (wait > 0.0) { - if (thread_cat.is_debug()) { - thread_cat.debug() - << "Sleeping all threads " << wait << " seconds\n"; - } - system_sleep(wait); - } - now = get_current_time(); - wake_sleepers(now); } _current_thread = _ready.front(); @@ -454,6 +467,25 @@ system_sleep(double seconds) { #endif // WIN32 } +//////////////////////////////////////////////////////////////////// +// Function: ThreadSimpleManager::system_yield +// Access: Private, Static +// Description: Calls the appropriate system function to yield +// the whole process to any other system processes. +//////////////////////////////////////////////////////////////////// +void ThreadSimpleManager:: +system_yield() { +#ifdef WIN32 + Sleep(0); + +#else + struct timespec rqtp; + rqtp.tv_sec = 0; + rqtp.tv_nsec = 0; + nanosleep(&rqtp, NULL); +#endif // WIN32 +} + //////////////////////////////////////////////////////////////////// // Function: ThreadSimpleManager::report_deadlock // Access: Private diff --git a/panda/src/pipeline/threadSimpleManager.h b/panda/src/pipeline/threadSimpleManager.h index 69c15159ff..6814e95289 100644 --- a/panda/src/pipeline/threadSimpleManager.h +++ b/panda/src/pipeline/threadSimpleManager.h @@ -79,6 +79,7 @@ private: void choose_next_context(); void wake_sleepers(double now); static void system_sleep(double seconds); + static void system_yield(); void report_deadlock(); // STL function object to sort the priority queue of sleeping threads. @@ -98,6 +99,7 @@ private: // FIFO list of ready threads. FifoThreads _ready; + FifoThreads _next_ready; typedef pmap Blocked; Blocked _blocked; diff --git a/panda/src/pnmimage/pnmReader.cxx b/panda/src/pnmimage/pnmReader.cxx index e24671a2be..d559466f23 100644 --- a/panda/src/pnmimage/pnmReader.cxx +++ b/panda/src/pnmimage/pnmReader.cxx @@ -18,6 +18,7 @@ #include "pnmReader.h" #include "virtualFileSystem.h" +#include "thread.h" //////////////////////////////////////////////////////////////////// // Function: PNMReader::Destructor @@ -97,6 +98,7 @@ read_data(xel *array, xelval *alpha) { int y; for (y = 0; y < _y_size; ++y) { if (!read_row(array + y * _x_size, alpha + y * _x_size, _x_size, _y_size)) { + Thread::consider_yield(); return y; } } @@ -129,6 +131,7 @@ read_data(xel *array, xelval *alpha) { for (int yi = 0; yi < y_reduction; ++yi) { // OK, read a row. This reads the original, full-size row. if (!read_row(orig_row_array, orig_row_alpha, _orig_x_size, _orig_y_size)) { + Thread::consider_yield(); return y; } diff --git a/panda/src/pnmimage/pnmWriter.cxx b/panda/src/pnmimage/pnmWriter.cxx index dedbd91fd6..4a9a32e15f 100644 --- a/panda/src/pnmimage/pnmWriter.cxx +++ b/panda/src/pnmimage/pnmWriter.cxx @@ -17,6 +17,7 @@ //////////////////////////////////////////////////////////////////// #include "pnmWriter.h" +#include "thread.h" //////////////////////////////////////////////////////////////////// // Function: PNMWriter::Destructor @@ -66,6 +67,7 @@ write_data(xel *array, xelval *alpha) { int y; for (y = 0; y < _y_size; y++) { if (!write_row(array + y * _x_size, alpha + y * _x_size)) { + Thread::consider_yield(); return y; } } diff --git a/panda/src/pnmimagetypes/pnmFileTypeBMPWriter.cxx b/panda/src/pnmimagetypes/pnmFileTypeBMPWriter.cxx index 25297ce8fb..bb84499d06 100644 --- a/panda/src/pnmimagetypes/pnmFileTypeBMPWriter.cxx +++ b/panda/src/pnmimagetypes/pnmFileTypeBMPWriter.cxx @@ -25,6 +25,7 @@ #include "bmp.h" #include "ppmcmap.h" #include "pnmbitio.h" +#include "thread.h" // Much code in this file is borrowed from Netpbm, specifically ppmtobmp.c. /* @@ -629,6 +630,7 @@ write_data(xel *array, xelval *) { BMPEncode(_file, classv, _x_size, _y_size, pixels, colors, cht, Red, Green, Blue); } + Thread::consider_yield(); return _y_size; } diff --git a/panda/src/pnmimagetypes/pnmFileTypeJPGReader.cxx b/panda/src/pnmimagetypes/pnmFileTypeJPGReader.cxx index 5618cbf815..3edee02130 100644 --- a/panda/src/pnmimagetypes/pnmFileTypeJPGReader.cxx +++ b/panda/src/pnmimagetypes/pnmFileTypeJPGReader.cxx @@ -21,7 +21,7 @@ #ifdef HAVE_JPEG #include "config_pnmimagetypes.h" - +#include "thread.h" // // The following bit of code, for setting up jpeg_istream_src(), was @@ -126,6 +126,7 @@ fill_input_buffer (j_decompress_ptr cinfo) src->infile->read((char *)src->buffer, INPUT_BUF_SIZE); nbytes = src->infile->gcount(); + Thread::consider_yield(); if (nbytes <= 0) { if (src->start_of_file) /* Treat empty input file as fatal error */ diff --git a/panda/src/pnmimagetypes/pnmFileTypeJPGWriter.cxx b/panda/src/pnmimagetypes/pnmFileTypeJPGWriter.cxx index e2a9b76922..3b1ed9911f 100644 --- a/panda/src/pnmimagetypes/pnmFileTypeJPGWriter.cxx +++ b/panda/src/pnmimagetypes/pnmFileTypeJPGWriter.cxx @@ -24,6 +24,7 @@ #include "pnmImage.h" #include "pnmWriter.h" +#include "thread.h" // @@ -122,6 +123,7 @@ empty_output_buffer (j_compress_ptr cinfo) dest->pub.next_output_byte = dest->buffer; dest->pub.free_in_buffer = OUTPUT_BUF_SIZE; + Thread::consider_yield(); return TRUE; } @@ -148,6 +150,7 @@ term_destination (j_compress_ptr cinfo) ERREXIT(cinfo, JERR_FILE_WRITE); } dest->outfile->flush(); + Thread::consider_yield(); /* Make sure we wrote the output file OK */ if (dest->outfile->fail()) ERREXIT(cinfo, JERR_FILE_WRITE); diff --git a/panda/src/pnmimagetypes/pnmFileTypePNG.cxx b/panda/src/pnmimagetypes/pnmFileTypePNG.cxx index 1ba6861937..fb20afd580 100755 --- a/panda/src/pnmimagetypes/pnmFileTypePNG.cxx +++ b/panda/src/pnmimagetypes/pnmFileTypePNG.cxx @@ -24,6 +24,7 @@ #include "pnmFileTypeRegistry.h" #include "bamReader.h" +#include "thread.h" static const char * const extensions_png[] = { "png" @@ -453,6 +454,7 @@ png_read_data(png_structp png_ptr, png_bytep data, png_size_t length) { << "Didn't read enough bytes.\n"; // Is there no way to indicate a read failure to libpng? } + Thread::consider_yield(); } //////////////////////////////////////////////////////////////////// @@ -774,6 +776,7 @@ write_data(xel *array, xelval *alpha_data) { nassertr(dest <= row + row_byte_length, yi); png_write_row(_png, row); + Thread::consider_yield(); } delete[] row; @@ -831,7 +834,7 @@ make_png_bit_depth(int bit_depth) { //////////////////////////////////////////////////////////////////// // Function: PNMFileTypePNG::Writer::png_write_data // Access: Private, Static -// Description: A callback handler that PNG uses to write data from +// Description: A callback handler that PNG uses to write data to // the iostream. //////////////////////////////////////////////////////////////////// void PNMFileTypePNG::Writer:: @@ -848,7 +851,7 @@ png_write_data(png_structp png_ptr, png_bytep data, png_size_t length) { //////////////////////////////////////////////////////////////////// // Function: PNMFileTypePNG::Writer::png_flush_data // Access: Private, Static -// Description: A callback handler that PNG uses to write data from +// Description: A callback handler that PNG uses to write data to // the iostream. //////////////////////////////////////////////////////////////////// void PNMFileTypePNG::Writer:: diff --git a/panda/src/pnmimagetypes/pnmFileTypeSGIReader.cxx b/panda/src/pnmimagetypes/pnmFileTypeSGIReader.cxx index f364a96a90..40948e3272 100644 --- a/panda/src/pnmimagetypes/pnmFileTypeSGIReader.cxx +++ b/panda/src/pnmimagetypes/pnmFileTypeSGIReader.cxx @@ -484,6 +484,7 @@ read_bytes(istream *ifp, readerr(ifp); memset(buf+r, 0, n-r); } + Thread::consider_yield(); } diff --git a/panda/src/pnmimagetypes/pnmFileTypeTIFF.cxx b/panda/src/pnmimagetypes/pnmFileTypeTIFF.cxx index 77f907f61b..2221e2ce68 100644 --- a/panda/src/pnmimagetypes/pnmFileTypeTIFF.cxx +++ b/panda/src/pnmimagetypes/pnmFileTypeTIFF.cxx @@ -93,6 +93,7 @@ static tsize_t istream_read(thandle_t fd, tdata_t buf, tsize_t size) { istream *in = (istream *)fd; in->read((char *)buf, size); + Thread::consider_yield(); return in->gcount(); } @@ -100,6 +101,7 @@ static tsize_t ostream_write(thandle_t fd, tdata_t buf, tsize_t size) { ostream *out = (ostream *)fd; out->write((char *)buf, size); + Thread::consider_yield(); return out->fail() ? (tsize_t)0 : size; } diff --git a/panda/src/putil/datagramInputFile.cxx b/panda/src/putil/datagramInputFile.cxx index 5f6110919a..38e3679274 100644 --- a/panda/src/putil/datagramInputFile.cxx +++ b/panda/src/putil/datagramInputFile.cxx @@ -24,6 +24,7 @@ #include "config_express.h" #include "virtualFileSystem.h" #include "streamReader.h" +#include "thread.h" //////////////////////////////////////////////////////////////////// // Function: DatagramInputFile::open @@ -112,6 +113,7 @@ read_header(string &header, size_t num_bytes) { } header = string(buffer, num_bytes); + Thread::consider_yield(); return true; } @@ -172,6 +174,7 @@ get_datagram(Datagram &data) { data = Datagram(buffer, num_bytes); } + Thread::consider_yield(); return true; } diff --git a/panda/src/putil/datagramOutputFile.cxx b/panda/src/putil/datagramOutputFile.cxx index fe7d6b38e4..cb2de8a65b 100644 --- a/panda/src/putil/datagramOutputFile.cxx +++ b/panda/src/putil/datagramOutputFile.cxx @@ -101,6 +101,7 @@ write_header(const string &header) { nassertr(!_wrote_first_datagram, false); _out->write(header.data(), header.size()); + thread_consider_yield(); return !_out->fail(); } @@ -121,6 +122,7 @@ put_datagram(const Datagram &data) { // Now, write the datagram itself. _out->write((const char *)data.get_data(), data.get_length()); + thread_consider_yield(); return !_out->fail(); }