diff --git a/tests/pipeline/test_condition_var.py b/tests/pipeline/test_condition_var.py new file mode 100644 index 0000000000..6aa8bcf320 --- /dev/null +++ b/tests/pipeline/test_condition_var.py @@ -0,0 +1,137 @@ +from panda3d.core import Mutex, ConditionVarFull +from panda3d import core +from direct.stdpy import thread +import pytest + + +def yield_thread(): + # Thread.force_yield() is not enough for true-threading builds, whereas + # time.sleep() does not yield in simple-thread builds. Thread.sleep() + # seems to do the job in all cases, however. + core.Thread.sleep(0.002) + + +def test_cvar_notify(): + # Just tests that notifying without waiting does no harm. + m = Mutex() + cv = ConditionVarFull(m) + + cv.notify() + cv.notify_all() + del cv + + +def test_cvar_notify_locked(): + # Tests the same thing, but with the lock held. + m = Mutex() + cv = ConditionVarFull(m) + + m.acquire() + cv.notify() + m.release() + + m.acquire() + cv.notify_all() + m.release() + del cv + + +@pytest.mark.parametrize("num_threads", [1, 2, 3, 4]) +@pytest.mark.skipif(not core.Thread.is_threading_supported(), + reason="Threading support disabled") +def test_cvar_notify_thread(num_threads): + # Tests notify() with some number of threads waiting. + m = Mutex() + cv = ConditionVarFull(m) + + # We prematurely notify, so that we can test that it's not doing anything. + m.acquire() + cv.notify() + + state = {'waiting': 0} + + def wait_thread(): + m.acquire() + state['waiting'] += 1 + cv.wait() + state['waiting'] -= 1 + m.release() + + # Start the threads, and yield to it, giving it a chance to mess up. + threads = [] + for i in range(num_threads): + thread = core.PythonThread(wait_thread, (), "", "") + thread.start(core.TP_high, True) + + # Yield until all of the threads are waiting for the condition variable. + for i in range(1000): + m.release() + yield_thread() + m.acquire() + if state['waiting'] == num_threads: + break + + assert state['waiting'] == num_threads + m.release() + + # OK, now signal it, and yield. One thread must be unblocked per notify. + for i in range(num_threads): + cv.notify() + yield_thread() + m.acquire() + assert state['waiting'] == num_threads - i - 1 + m.release() + + for thread in threads: + thread.join() + cv = None + + +@pytest.mark.parametrize("num_threads", [1, 2, 3, 4]) +@pytest.mark.skipif(not core.Thread.is_threading_supported(), + reason="Threading support disabled") +def test_cvar_notify_all_threads(num_threads): + # Tests notify_all() with some number of threads waiting. + m = Mutex() + cv = ConditionVarFull(m) + + # We prematurely notify, so that we can test that it's not doing anything. + m.acquire() + cv.notify_all() + + state = {'waiting': 0} + + def wait_thread(): + m.acquire() + state['waiting'] += 1 + cv.wait() + state['waiting'] -= 1 + m.release() + + # Start the threads, and yield to it, giving it a chance to mess up. + threads = [] + for i in range(num_threads): + thread = core.PythonThread(wait_thread, (), "", "") + thread.start(core.TP_high, True) + + # Yield until all of the threads are waiting for the condition variable. + for i in range(1000): + m.release() + yield_thread() + m.acquire() + if state['waiting'] == num_threads: + break + + assert state['waiting'] == num_threads + m.release() + + # OK, now signal it, and yield. All threads must unblock. + cv.notify_all() + yield_thread() + m.acquire() + assert state['waiting'] == 0 + m.release() + + for thread in threads: + thread.join() + cv = None diff --git a/tests/pipeline/test_mutex.py b/tests/pipeline/test_mutex.py index 2073c09efa..668d275b08 100644 --- a/tests/pipeline/test_mutex.py +++ b/tests/pipeline/test_mutex.py @@ -1,4 +1,7 @@ from panda3d.core import Mutex, ReMutex +from panda3d import core +from random import random +import pytest def test_mutex_acquire_release(): @@ -31,6 +34,69 @@ def test_mutex_try_acquire(): m.release() +@pytest.mark.skipif(not core.Thread.is_threading_supported(), + reason="Threading support disabled") +def test_mutex_contention(): + # As a smoke test for mutexes, we just spawn a bunch of threads that do a + # lot of mutexing and hope that we can catch any obvious issues with the + # mutex implementation, especially when compiling with DEBUG_THREADS. + m1 = Mutex() + m2 = Mutex() + m3 = Mutex() + m4 = Mutex() + + def thread_acq_rel(m): + for i in range(5000): + m.acquire() + m.release() + + def thread_nested(): + for i in range(5000): + m1.acquire() + m4.acquire() + m4.release() + m1.release() + + def thread_hand_over_hand(): + m1.acquire() + for i in range(5000): + m2.acquire() + m1.release() + m3.acquire() + m2.release() + m1.acquire() + m3.release() + + m1.release() + + def thread_sleep(m): + for i in range(250): + m.acquire() + core.Thread.sleep(random() * 0.003) + m.release() + + threads = [ + core.PythonThread(thread_acq_rel, (m1,), "", ""), + core.PythonThread(thread_acq_rel, (m2,), "", ""), + core.PythonThread(thread_acq_rel, (m3,), "", ""), + core.PythonThread(thread_acq_rel, (m4,), "", ""), + core.PythonThread(thread_nested, (), "", ""), + core.PythonThread(thread_nested, (), "", ""), + core.PythonThread(thread_nested, (), "", ""), + core.PythonThread(thread_hand_over_hand, (), "", ""), + core.PythonThread(thread_hand_over_hand, (), "", ""), + core.PythonThread(thread_sleep, (m1,), "", ""), + core.PythonThread(thread_sleep, (m2,), "", ""), + core.PythonThread(thread_sleep, (m3,), "", ""), + core.PythonThread(thread_sleep, (m4,), "", ""), + ] + for thread in threads: + thread.start(core.TP_normal, True) + + for thread in threads: + thread.join() + + def test_remutex_acquire_release(): m = ReMutex() m.acquire()