task: Annotate core functions (#1548)

This commit is contained in:
WMOkiishi 2023-10-13 13:20:20 -06:00 committed by GitHub
parent 5685949588
commit 098fe634a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 249 additions and 125 deletions

View File

@ -3421,7 +3421,7 @@ class ShowBase(DirectObject.DirectObject):
# Set fWantTk to 0 to avoid starting Tk with this call
self.startDirect(fWantDirect = fDirect, fWantTk = fTk, fWantWx = fWx)
def run(self): # pylint: disable=method-hidden
def run(self) -> None: # pylint: disable=method-hidden
"""This method runs the :class:`~direct.task.Task.TaskManager`
when ``self.appRunner is None``, which is to say, when we are
not running from within a p3d file. When we *are* within a p3d

View File

@ -6,6 +6,8 @@ For more information about the task system, consult the
:ref:`tasks-and-event-handling` page in the programming manual.
"""
from __future__ import annotations
__all__ = ['Task', 'TaskManager',
'cont', 'done', 'again', 'pickup', 'exit',
'sequence', 'loop', 'pause']
@ -13,7 +15,7 @@ __all__ = ['Task', 'TaskManager',
from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.showbase.PythonUtil import Functor, ScratchPad
from direct.showbase.MessengerGlobal import messenger
from typing import Any, Optional
from typing import Any, Callable, Coroutine, Final, Generator, Sequence, TypeVar, Union
import types
import random
import importlib
@ -21,7 +23,7 @@ import sys
# On Android, there's no use handling SIGINT, and in fact we can't, since we
# run the application in a separate thread from the main thread.
signal: Optional[types.ModuleType]
signal: types.ModuleType | None
if hasattr(sys, 'getandroidapilevel'):
signal = None
else:
@ -43,8 +45,15 @@ from panda3d.core import (
)
from direct.extensions_native import HTTPChannel_extensions # pylint: disable=unused-import
# The following variables are typing constructs used in annotations
# to succinctly express all the types that can be converted into tasks.
_T = TypeVar('_T', covariant=True)
_TaskCoroutine = Union[Coroutine[Any, None, _T], Generator[Any, None, _T]]
_TaskFunction = Callable[..., Union[int, _TaskCoroutine[Union[int, None]], None]]
_FuncOrTask = Union[_TaskFunction, _TaskCoroutine[Any], AsyncTask]
def print_exc_plus():
def print_exc_plus() -> None:
"""
Print the usual traceback information, followed by a listing of all the
local variables in each frame.
@ -52,12 +61,13 @@ def print_exc_plus():
import traceback
tb = sys.exc_info()[2]
assert tb is not None
while 1:
if not tb.tb_next:
break
tb = tb.tb_next
stack = []
f = tb.tb_frame
f: types.FrameType | None = tb.tb_frame
while f:
stack.append(f)
f = f.f_back
@ -84,11 +94,11 @@ def print_exc_plus():
# these Python names, and define them both at the module level, here,
# and at the class level (below). The preferred access is via the
# class level.
done = AsyncTask.DSDone
cont = AsyncTask.DSCont
again = AsyncTask.DSAgain
pickup = AsyncTask.DSPickup
exit = AsyncTask.DSExit
done: Final = AsyncTask.DSDone
cont: Final = AsyncTask.DSCont
again: Final = AsyncTask.DSAgain
pickup: Final = AsyncTask.DSPickup
exit: Final = AsyncTask.DSExit
#: Task aliases to :class:`panda3d.core.PythonTask` for historical purposes.
Task = PythonTask
@ -112,7 +122,7 @@ gather = Task.gather
shield = Task.shield
def sequence(*taskList):
def sequence(*taskList: AsyncTask) -> AsyncTaskSequence:
seq = AsyncTaskSequence('sequence')
for task in taskList:
seq.addTask(task)
@ -122,7 +132,7 @@ def sequence(*taskList):
Task.DtoolClassDict['sequence'] = staticmethod(sequence)
def loop(*taskList):
def loop(*taskList: AsyncTask) -> AsyncTaskSequence:
seq = AsyncTaskSequence('loop')
for task in taskList:
seq.addTask(task)
@ -144,10 +154,10 @@ class TaskManager:
__prevHandler: Any
def __init__(self):
def __init__(self) -> None:
self.mgr = AsyncTaskManager.getGlobalPtr()
self.resumeFunc = None
self.resumeFunc: Callable[[], object] | None = None
self.globalClock = self.mgr.getClock()
self.stepping = False
self.running = False
@ -157,12 +167,12 @@ class TaskManager:
if signal:
self.__prevHandler = signal.default_int_handler
self._frameProfileQueue = []
self._frameProfileQueue: list[tuple[int, Any, Callable[[], object] | None]] = []
# this will be set when it's safe to import StateVar
self._profileFrames = None
self._profileFrames: Any = None
self._frameProfiler = None
self._profileTasks = None
self._profileTasks: Any = None
self._taskProfiler = None
self._taskProfileInfo = ScratchPad(
taskId = None,
@ -170,7 +180,7 @@ class TaskManager:
session = None,
)
def finalInit(self):
def finalInit(self) -> None:
# This function should be called once during startup, after
# most things are imported.
from direct.fsm.StatePush import StateVar
@ -179,7 +189,7 @@ class TaskManager:
self._profileFrames = StateVar(False)
self.setProfileFrames(ConfigVariableBool('profile-frames', 0).getValue())
def destroy(self):
def destroy(self) -> None:
# This should be safe to call multiple times.
self.running = False
self.notify.info("TaskManager.destroy()")
@ -187,10 +197,10 @@ class TaskManager:
self._frameProfileQueue.clear()
self.mgr.cleanup()
def __getClock(self):
def __getClock(self) -> ClockObject:
return self.mgr.getClock()
def setClock(self, clockObject):
def setClock(self, clockObject: ClockObject) -> None:
self.mgr.setClock(clockObject)
self.globalClock = clockObject
@ -215,13 +225,13 @@ class TaskManager:
# Next time around invoke the default handler
signal.signal(signal.SIGINT, self.invokeDefaultHandler)
def getCurrentTask(self):
def getCurrentTask(self) -> AsyncTask | None:
""" Returns the task currently executing on this thread, or
None if this is being called outside of the task manager. """
return Thread.getCurrentThread().getCurrentTask()
def hasTaskChain(self, chainName):
def hasTaskChain(self, chainName: str) -> bool:
""" Returns true if a task chain with the indicated name has
already been defined, or false otherwise. Note that
setupTaskChain() will implicitly define a task chain if it has
@ -231,9 +241,16 @@ class TaskManager:
return self.mgr.findTaskChain(chainName) is not None
def setupTaskChain(self, chainName, numThreads = None, tickClock = None,
threadPriority = None, frameBudget = None,
frameSync = None, timeslicePriority = None):
def setupTaskChain(
self,
chainName: str,
numThreads: int | None = None,
tickClock: bool | None = None,
threadPriority: int | None = None,
frameBudget: float | None = None,
frameSync: bool | None = None,
timeslicePriority: bool | None = None,
) -> None:
"""Defines a new task chain. Each task chain executes tasks
potentially in parallel with all of the other task chains (if
numThreads is more than zero). When a new task is created, it
@ -297,40 +314,50 @@ class TaskManager:
if timeslicePriority is not None:
chain.setTimeslicePriority(timeslicePriority)
def hasTaskNamed(self, taskName):
def hasTaskNamed(self, taskName: str) -> bool:
"""Returns true if there is at least one task, active or
sleeping, with the indicated name. """
return bool(self.mgr.findTask(taskName))
def getTasksNamed(self, taskName):
def getTasksNamed(self, taskName: str) -> list[AsyncTask]:
"""Returns a list of all tasks, active or sleeping, with the
indicated name. """
return list(self.mgr.findTasks(taskName))
def getTasksMatching(self, taskPattern):
def getTasksMatching(self, taskPattern: GlobPattern | str) -> list[AsyncTask]:
"""Returns a list of all tasks, active or sleeping, with a
name that matches the pattern, which can include standard
shell globbing characters like \\*, ?, and []. """
return list(self.mgr.findTasksMatching(GlobPattern(taskPattern)))
def getAllTasks(self):
def getAllTasks(self) -> list[AsyncTask]:
"""Returns list of all tasks, active and sleeping, in
arbitrary order. """
return list(self.mgr.getTasks())
def getTasks(self):
def getTasks(self) -> list[AsyncTask]:
"""Returns list of all active tasks in arbitrary order. """
return list(self.mgr.getActiveTasks())
def getDoLaters(self):
def getDoLaters(self) -> list[AsyncTask]:
"""Returns list of all sleeping tasks in arbitrary order. """
return list(self.mgr.getSleepingTasks())
def doMethodLater(self, delayTime, funcOrTask, name, extraArgs = None,
sort = None, priority = None, taskChain = None,
uponDeath = None, appendTask = False, owner = None):
def doMethodLater(
self,
delayTime: float,
funcOrTask: _FuncOrTask,
name: str | None,
extraArgs: Sequence | None = None,
sort: int | None = None,
priority: int | None = None,
taskChain: str | None = None,
uponDeath: Callable[[], object] | None = None,
appendTask: bool = False,
owner = None,
) -> AsyncTask:
"""Adds a task to be performed at some time in the future.
This is identical to `add()`, except that the specified
delayTime is applied to the Task object first, which means
@ -353,9 +380,19 @@ class TaskManager:
do_method_later = doMethodLater
def add(self, funcOrTask, name = None, sort = None, extraArgs = None,
priority = None, uponDeath = None, appendTask = False,
taskChain = None, owner = None, delay = None):
def add(
self,
funcOrTask: _FuncOrTask,
name: str | None = None,
sort: int | None = None,
extraArgs: Sequence | None = None,
priority: int | None = None,
uponDeath: Callable[[], object] | None = None,
appendTask: bool = False,
taskChain: str | None = None,
owner = None,
delay: float | None = None,
) -> AsyncTask:
"""
Add a new task to the taskMgr. The task will begin executing
immediately, or next frame if its sort value has already
@ -422,7 +459,18 @@ class TaskManager:
self.mgr.add(task)
return task
def __setupTask(self, funcOrTask, name, priority, sort, extraArgs, taskChain, appendTask, owner, uponDeath):
def __setupTask(
self,
funcOrTask: _FuncOrTask,
name: str | None,
priority: int | None,
sort: int | None,
extraArgs: Sequence | None,
taskChain: str | None,
appendTask: bool,
owner,
uponDeath: Callable[[], object] | None,
) -> AsyncTask:
wasTask = False
if isinstance(funcOrTask, AsyncTask):
task = funcOrTask
@ -480,7 +528,7 @@ class TaskManager:
return task
def remove(self, taskOrName):
def remove(self, taskOrName: AsyncTask | str | list[AsyncTask | str]) -> int:
"""Removes a task from the task manager. The task is stopped,
almost as if it had returned task.done. (But if the task is
currently executing, it will finish out its current frame
@ -492,13 +540,15 @@ class TaskManager:
if isinstance(taskOrName, AsyncTask):
return self.mgr.remove(taskOrName)
elif isinstance(taskOrName, list):
count = 0
for task in taskOrName:
self.remove(task)
count += self.remove(task)
return count
else:
tasks = self.mgr.findTasks(taskOrName)
return self.mgr.remove(tasks)
def removeTasksMatching(self, taskPattern):
def removeTasksMatching(self, taskPattern: GlobPattern | str) -> int:
"""Removes all tasks whose names match the pattern, which can
include standard shell globbing characters like \\*, ?, and [].
See also :meth:`remove()`.
@ -508,7 +558,7 @@ class TaskManager:
tasks = self.mgr.findTasksMatching(GlobPattern(taskPattern))
return self.mgr.remove(tasks)
def step(self):
def step(self) -> None:
"""Invokes the task manager for one frame, and then returns.
Normally, this executes each task exactly once, though task
chains that are in sub-threads or that have frame budgets
@ -519,7 +569,7 @@ class TaskManager:
# Replace keyboard interrupt handler during task list processing
# so we catch the keyboard interrupt but don't handle it until
# after task list processing is complete.
self.fKeyboardInterrupt = 0
self.fKeyboardInterrupt = False
self.interruptCount = 0
if signal:
@ -541,7 +591,7 @@ class TaskManager:
if self.fKeyboardInterrupt:
raise KeyboardInterrupt
def run(self):
def run(self) -> None:
"""Starts the task manager running. Does not return until an
exception is encountered (including KeyboardInterrupt). """
@ -567,11 +617,11 @@ class TaskManager:
if len(self._frameProfileQueue) > 0:
numFrames, session, callback = self._frameProfileQueue.pop(0)
def _profileFunc(numFrames=numFrames):
def _profileFunc(numFrames: int = numFrames) -> None:
self._doProfiledFrames(numFrames)
session.setFunc(_profileFunc)
session.run()
_profileFunc = None
del _profileFunc
if callback:
callback()
session.release()
@ -624,7 +674,7 @@ class TaskManager:
message = ioError
return code, message
def stop(self):
def stop(self) -> None:
# Set a flag so we will stop before beginning next frame
self.running = False
@ -789,12 +839,12 @@ class TaskManager:
task = tasks.getTask(i)
return task
def __repr__(self):
def __repr__(self) -> str:
return str(self.mgr)
# In the event we want to do frame time managment, this is the
# function to replace or overload.
def doYield(self, frameStartTime, nextScheduledTaskTime):
def doYield(self, frameStartTime: float, nextScheduledTaskTime: float) -> None:
pass
#def doYieldExample(self, frameStartTime, nextScheduledTaskTime):

View File

@ -1,22 +1,82 @@
import pytest
from panda3d import core
from direct.task import Task
def test_TaskManager():
tm = Task.TaskManager()
tm.mgr = core.AsyncTaskManager("Test manager")
tm.setClock(core.ClockObject())
tm.setupTaskChain("default", tickClock = True)
TASK_NAME = 'Arbitrary task name'
TASK_CHAIN_NAME = 'Arbitrary task chain name'
tm._startTrackingMemLeaks = lambda: None
tm._stopTrackingMemLeaks = lambda: None
tm._checkMemLeaks = lambda: None
# check for memory leaks after every test
tm._startTrackingMemLeaks()
tm._checkMemLeaks()
def DUMMY_FUNCTION(*_):
pass
@pytest.fixture
def task_manager():
manager = Task.TaskManager()
manager.mgr = core.AsyncTaskManager('Test manager')
manager.clock = core.ClockObject()
manager.setupTaskChain('default', tickClock=True)
manager.finalInit()
yield manager
manager.destroy()
def test_sequence(task_manager):
numbers = []
def append_1(task):
numbers.append(1)
def append_2(task):
numbers.append(2)
sequence = Task.sequence(core.PythonTask(append_1), core.PythonTask(append_2))
task_manager.add(sequence)
for _ in range(3):
task_manager.step()
assert not task_manager.getTasks()
assert numbers == [1, 2]
def test_loop(task_manager):
numbers = []
def append_1(task):
numbers.append(1)
def append_2(task):
numbers.append(2)
loop = Task.loop(core.PythonTask(append_1), core.PythonTask(append_2))
task_manager.add(loop)
for _ in range(5):
task_manager.step()
assert numbers == [1, 2, 1, 2]
def test_get_current_task(task_manager):
def check_current_task(task):
assert task_manager.getCurrentTask().name == TASK_NAME
task_manager.add(check_current_task, TASK_NAME)
assert len(task_manager.getTasks()) == 1
assert task_manager.getCurrentTask() is None
task_manager.step()
assert len(task_manager.getTasks()) == 0
assert task_manager.getCurrentTask() is None
def test_has_task_chain(task_manager):
assert not task_manager.hasTaskChain(TASK_CHAIN_NAME)
task_manager.setupTaskChain(TASK_CHAIN_NAME)
assert task_manager.hasTaskChain(TASK_CHAIN_NAME)
def test_done(task_manager):
# run-once task
tm = task_manager
l = []
def _testDone(task, l=l):
@ -27,28 +87,31 @@ def test_TaskManager():
assert len(l) == 1
tm.step()
assert len(l) == 1
_testDone = None
tm._checkMemLeaks()
def test_remove_by_name(task_manager):
# remove by name
tm = task_manager
def _testRemoveByName(task):
return task.done
tm.add(_testRemoveByName, 'testRemoveByName')
assert tm.remove('testRemoveByName') == 1
assert tm.remove('testRemoveByName') == 0
_testRemoveByName = None
tm._checkMemLeaks()
def test_duplicate_named_tasks(task_manager):
# duplicate named tasks
tm = task_manager
def _testDupNamedTasks(task):
return task.done
tm.add(_testDupNamedTasks, 'testDupNamedTasks')
tm.add(_testDupNamedTasks, 'testDupNamedTasks')
assert tm.remove('testRemoveByName') == 0
_testDupNamedTasks = None
tm._checkMemLeaks()
def test_continued_task(task_manager):
# continued task
tm = task_manager
l = []
def _testCont(task, l = l):
@ -60,10 +123,11 @@ def test_TaskManager():
tm.step()
assert len(l) == 2
tm.remove('testCont')
_testCont = None
tm._checkMemLeaks()
def test_continue_until_done(task_manager):
# continue until done task
tm = task_manager
l = []
def _testContDone(task, l = l):
@ -80,20 +144,22 @@ def test_TaskManager():
tm.step()
assert len(l) == 2
assert not tm.hasTaskNamed('testContDone')
_testContDone = None
tm._checkMemLeaks()
def test_has_task_named(task_manager):
# hasTaskNamed
tm = task_manager
def _testHasTaskNamed(task):
return task.done
tm.add(_testHasTaskNamed, 'testHasTaskNamed')
assert tm.hasTaskNamed('testHasTaskNamed')
tm.step()
assert not tm.hasTaskNamed('testHasTaskNamed')
_testHasTaskNamed = None
tm._checkMemLeaks()
def test_task_sort(task_manager):
# task sort
tm = task_manager
l = []
def _testPri1(task, l = l):
@ -113,11 +179,11 @@ def test_TaskManager():
assert l == [1, 2, 1, 2,]
tm.remove('testPri1')
tm.remove('testPri2')
_testPri1 = None
_testPri2 = None
tm._checkMemLeaks()
def test_extra_args(task_manager):
# task extraArgs
tm = task_manager
l = []
def _testExtraArgs(arg1, arg2, l=l):
@ -127,10 +193,11 @@ def test_TaskManager():
tm.step()
assert len(l) == 2
assert l == [4, 5,]
_testExtraArgs = None
tm._checkMemLeaks()
def test_append_task(task_manager):
# task appendTask
tm = task_manager
l = []
def _testAppendTask(arg1, arg2, task, l=l):
@ -140,10 +207,11 @@ def test_TaskManager():
tm.step()
assert len(l) == 2
assert l == [4, 5,]
_testAppendTask = None
tm._checkMemLeaks()
def test_task_upon_death(task_manager):
# task uponDeath
tm = task_manager
l = []
def _uponDeathFunc(task, l=l):
@ -155,11 +223,11 @@ def test_TaskManager():
tm.step()
assert len(l) == 1
assert l == ['testUponDeath']
_testUponDeath = None
_uponDeathFunc = None
tm._checkMemLeaks()
def test_task_owner(task_manager):
# task owner
tm = task_manager
class _TaskOwner:
def _addTask(self, task):
self.addedTaskName = task.name
@ -175,11 +243,10 @@ def test_TaskManager():
tm.step()
assert getattr(to, 'addedTaskName', None) == 'testOwner'
assert getattr(to, 'clearedTaskName', None) == 'testOwner'
_testOwner = None
del to
_TaskOwner = None
tm._checkMemLeaks()
def test_do_laters(task_manager):
tm = task_manager
doLaterTests = [0,]
# doLater
@ -205,8 +272,6 @@ def test_TaskManager():
_testDoLater1 = None
_testDoLater2 = None
_monitorDoLater = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater sort
l = []
@ -231,8 +296,6 @@ def test_TaskManager():
_testDoLaterPri1 = None
_testDoLaterPri2 = None
_monitorDoLaterPri = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater extraArgs
l = []
@ -252,8 +315,6 @@ def test_TaskManager():
tm.add(_monitorDoLaterExtraArgs, 'monitorDoLaterExtraArgs', sort=10)
_testDoLaterExtraArgs = None
_monitorDoLaterExtraArgs = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater appendTask
l = []
@ -275,8 +336,6 @@ def test_TaskManager():
tm.add(_monitorDoLaterAppendTask, 'monitorDoLaterAppendTask', sort=10)
_testDoLaterAppendTask = None
_monitorDoLaterAppendTask = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater uponDeath
l = []
@ -302,8 +361,6 @@ def test_TaskManager():
_testUponDeathFunc = None
_testDoLaterUponDeath = None
_monitorDoLaterUponDeath = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater owner
class _DoLaterOwner:
@ -335,15 +392,15 @@ def test_TaskManager():
_monitorDoLaterOwner = None
del doLaterOwner
_DoLaterOwner = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# run the doLater tests
while doLaterTests[0] > 0:
tm.step()
del doLaterTests
tm._checkMemLeaks()
def test_get_tasks(task_manager):
tm = task_manager
# getTasks
def _testGetTasks(task):
return task.cont
@ -361,9 +418,10 @@ def test_TaskManager():
tm.remove('testGetTasks1')
tm.remove('testGetTasks3')
assert len(tm.getTasks()) == 0
_testGetTasks = None
tm._checkMemLeaks()
def test_get_do_laters(task_manager):
tm = task_manager
# getDoLaters
def _testGetDoLaters():
pass
@ -379,9 +437,18 @@ def test_TaskManager():
tm.remove('testDoLater1')
tm.remove('testDoLater3')
assert len(tm.getDoLaters()) == 0
_testGetDoLaters = None
tm._checkMemLeaks()
def test_get_all_tasks(task_manager):
active_task = task_manager.add(DUMMY_FUNCTION, delay=None)
sleeping_task = task_manager.add(DUMMY_FUNCTION, delay=1)
assert task_manager.getTasks() == [active_task]
assert task_manager.getDoLaters() == [sleeping_task]
assert task_manager.getAllTasks() in ([active_task, sleeping_task], [sleeping_task, active_task])
def test_duplicate_named_do_laters(task_manager):
tm = task_manager
# duplicate named doLaters removed via taskMgr.remove
def _testDupNameDoLaters():
pass
@ -391,9 +458,10 @@ def test_TaskManager():
assert len(tm.getDoLaters()) == 2
tm.remove('testDupNameDoLater')
assert len(tm.getDoLaters()) == 0
_testDupNameDoLaters = None
tm._checkMemLeaks()
def test_duplicate_named_do_laters_remove(task_manager):
tm = task_manager
# duplicate named doLaters removed via remove()
def _testDupNameDoLatersRemove():
pass
@ -405,10 +473,10 @@ def test_TaskManager():
assert len(tm.getDoLaters()) == 1
dl1.remove()
assert len(tm.getDoLaters()) == 0
_testDupNameDoLatersRemove = None
# nameDict etc. isn't cleared out right away with task.remove()
tm._checkMemLeaks()
def test_get_tasks_named(task_manager):
tm = task_manager
# getTasksNamed
def _testGetTasksNamed(task):
return task.cont
@ -421,9 +489,20 @@ def test_TaskManager():
assert len(tm.getTasksNamed('testGetTasksNamed')) == 3
tm.remove('testGetTasksNamed')
assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
_testGetTasksNamed = None
tm._checkMemLeaks()
def test_get_tasks_matching(task_manager):
task_manager.add(DUMMY_FUNCTION, 'task_1')
task_manager.add(DUMMY_FUNCTION, 'task_2')
task_manager.add(DUMMY_FUNCTION, 'another_task')
assert len(task_manager.getTasksMatching('task_?')) == 2
assert len(task_manager.getTasksMatching('*_task')) == 1
assert len(task_manager.getTasksMatching('*task*')) == 3
def test_remove_tasks_matching(task_manager):
tm = task_manager
# removeTasksMatching
def _testRemoveTasksMatching(task):
return task.cont
@ -445,9 +524,10 @@ def test_TaskManager():
tm.removeTasksMatching('testRemoveTasksMatching?a')
assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 0
assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 0
_testRemoveTasksMatching = None
tm._checkMemLeaks()
def test_task_obj(task_manager):
tm = task_manager
# create Task object and add to mgr
l = []
@ -463,9 +543,10 @@ def test_TaskManager():
tm.remove('testTaskObj')
tm.step()
assert len(l) == 2
_testTaskObj = None
tm._checkMemLeaks()
def test_task_remove(task_manager):
tm = task_manager
# remove Task via task.remove()
l = []
@ -482,9 +563,10 @@ def test_TaskManager():
tm.step()
assert len(l) == 2
del t
_testTaskObjRemove = None
tm._checkMemLeaks()
def test_task_get_sort(task_manager):
tm = task_manager
# set/get Task sort
l = []
def _testTaskObjSort(arg, task, l=l):
@ -508,11 +590,3 @@ def test_TaskManager():
t2.remove()
tm.step()
assert len(l) == 4
del t1
del t2
_testTaskObjSort = None
tm._checkMemLeaks()
del l
tm.destroy()
del tm