added TaskManager unit tests

This commit is contained in:
Darren Ranalli 2008-05-05 22:17:04 +00:00
parent 0b5894f27e
commit fa1bec2af7

View File

@ -401,6 +401,13 @@ class TaskManager:
# A default task. # A default task.
self.add(self.__doLaterProcessor, "doLaterProcessor", -10) self.add(self.__doLaterProcessor, "doLaterProcessor", -10)
def destroy(self):
del self.nameDict
del self.trueClock
del self.globalClock
del self.__doLaterList
del self.pendingTaskDict
del self.taskList
def setStepping(self, value): def setStepping(self, value):
self.stepping = value self.stepping = value
@ -1153,6 +1160,32 @@ class TaskManager:
return self.globalClock.getFrameTime() return self.globalClock.getFrameTime()
return self.trueClock.getShortTime() return self.trueClock.getShortTime()
if __debug__:
# to catch memory leaks during the tests at the bottom of the file
def _startTrackingMemLeaks(self):
self._memUsage = ScratchPad()
mu = self._memUsage
mu.lenTaskList = len(self.taskList)
mu.lenPendingTaskDict = len(self.pendingTaskDict)
mu.lenDoLaterList = len(self.__doLaterList)
mu.lenNameDict = len(self.nameDict)
def _stopTrackingMemLeaks(self):
self._memUsage.destroy()
del self._memUsage
def _checkMemLeaks(self):
# give the mgr a chance to clear out removed tasks
self.step()
mu = self._memUsage
# the task list looks like it grows and never shrinks, replacing finished
# tasks with 'None' in the TaskPriorityLists.
# TODO: look at reducing memory usage here--clear out excess at the end of every frame?
#assert mu.lenTaskList == len(self.taskList)
assert mu.lenPendingTaskDict == len(self.pendingTaskDict)
assert mu.lenDoLaterList == len(self.__doLaterList)
assert mu.lenNameDict == len(self.nameDict)
def startOsd(self): def startOsd(self):
self.add(self.doOsd, 'taskMgr.doOsd') self.add(self.doOsd, 'taskMgr.doOsd')
self._osdEnabled = None self._osdEnabled = None
@ -1218,6 +1251,455 @@ cont = Task.cont
again = Task.again again = Task.again
if __debug__:
# keep everything in a function namespace so it's easier to clean up
def runTests():
tm = TaskManager()
# looks like nothing runs on the first frame...?
# step to get past the first frame
tm.step()
# check for memory leaks after every test
tm._startTrackingMemLeaks()
tm._checkMemLeaks()
# run-once task
l = []
def _testDone(task, l=l):
l.append(None)
return task.done
tm.add(_testDone, 'testDone')
tm.step()
assert len(l) == 1
tm.step()
assert len(l) == 1
_testDone = None
tm._checkMemLeaks()
# remove by name
def _testRemoveByName(task):
return task.done
tm.add(_testRemoveByName, 'testRemoveByName')
assert tm.remove('testRemoveByName') == 1
assert tm.remove('testRemoveByName') == 0
_testRemoveByName = None
tm._checkMemLeaks()
# continued task
l = []
def _testCont(task, l = l):
l.append(None)
return task.cont
tm.add(_testCont, 'testCont')
tm.step()
assert len(l) == 1
tm.step()
assert len(l) == 2
tm.remove('testCont')
_testCont = None
tm._checkMemLeaks()
# continue until done task
l = []
def _testContDone(task, l = l):
l.append(None)
if len(l) >= 2:
return task.done
else:
return task.cont
tm.add(_testContDone, 'testContDone')
tm.step()
assert len(l) == 1
tm.step()
assert len(l) == 2
tm.step()
assert len(l) == 2
assert not tm.hasTaskNamed('testContDone')
_testContDone = None
tm._checkMemLeaks()
# hasTaskNamed
def _testHasTaskNamed(task):
return task.done
tm.add(_testHasTaskNamed, 'testHasTaskNamed')
assert tm.hasTaskNamed('testHasTaskNamed')
tm.step()
assert not tm.hasTaskNamed('testHasTaskNamed')
_testHasTaskNamed = None
tm._checkMemLeaks()
# task priority
l = []
def _testPri1(task, l = l):
l.append(1)
return task.cont
def _testPri2(task, l = l):
l.append(2)
return task.cont
tm.add(_testPri1, 'testPri1', priority = 1)
tm.add(_testPri2, 'testPri2', priority = 2)
tm.step()
assert len(l) == 2
assert l == [1, 2,]
tm.step()
assert len(l) == 4
assert l == [1, 2, 1, 2,]
tm.remove('testPri1')
tm.remove('testPri2')
_testPri1 = None
_testPri2 = None
tm._checkMemLeaks()
# task extraArgs
l = []
def _testExtraArgs(arg1, arg2, l=l):
l.extend([arg1, arg2,])
return done
tm.add(_testExtraArgs, 'testExtraArgs', extraArgs=[4,5])
tm.step()
assert len(l) == 2
assert l == [4, 5,]
_testExtraArgs = None
tm._checkMemLeaks()
# task appendTask
l = []
def _testAppendTask(arg1, arg2, task, l=l):
l.extend([arg1, arg2,])
return task.done
tm.add(_testAppendTask, '_testAppendTask', extraArgs=[4,5], appendTask=True)
tm.step()
assert len(l) == 2
assert l == [4, 5,]
_testAppendTask = None
tm._checkMemLeaks()
# task uponDeath
l = []
def _uponDeathFunc(task, l=l):
l.append(task.name)
def _testUponDeath(task):
return done
tm.add(_testUponDeath, 'testUponDeath', uponDeath=_uponDeathFunc)
tm.step()
assert len(l) == 1
assert l == ['testUponDeath']
_testUponDeath = None
_uponDeathFunc = None
tm._checkMemLeaks()
# task owner
class _TaskOwner:
def _clearTask(self, task):
self.clearedTaskName = task.name
to = _TaskOwner()
l = []
def _testOwner(task):
return done
tm.add(_testOwner, 'testOwner', owner=to)
tm.step()
assert hasattr(to, 'clearedTaskName')
assert to.clearedTaskName == 'testOwner'
_testOwner = None
del to
_TaskOwner = None
tm._checkMemLeaks()
doLaterTests = [0,]
# doLater
l = []
def _testDoLater1(task, l=l):
l.append(1)
def _testDoLater2(task, l=l):
l.append(2)
def _monitorDoLater(task, tm=tm, l=l, doLaterTests=doLaterTests):
if task.time > .03:
assert l == [1, 2,]
doLaterTests[0] -= 1
return task.done
return task.cont
tm.doMethodLater(.01, _testDoLater1, 'testDoLater1')
tm.doMethodLater(.02, _testDoLater2, 'testDoLater2')
doLaterTests[0] += 1
tm.add(_monitorDoLater, 'monitorDoLater')
_testDoLater1 = None
_testDoLater2 = None
_monitorDoLater = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater priority
l = []
def _testDoLaterPri1(task, l=l):
l.append(1)
def _testDoLaterPri2(task, l=l):
l.append(2)
def _monitorDoLaterPri(task, tm=tm, l=l, doLaterTests=doLaterTests):
if task.time > .02:
assert l == [1, 2,]
doLaterTests[0] -= 1
return task.done
return task.cont
tm.doMethodLater(.01, _testDoLaterPri1, 'testDoLaterPri1', priority=1)
tm.doMethodLater(.01, _testDoLaterPri2, 'testDoLaterPri2', priority=2)
doLaterTests[0] += 1
tm.add(_monitorDoLaterPri, 'monitorDoLaterPri')
_testDoLaterPri1 = None
_testDoLaterPri2 = None
_monitorDoLaterPri = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater extraArgs
l = []
def _testDoLaterExtraArgs(arg1, l=l):
l.append(arg1)
def _monitorDoLaterExtraArgs(task, tm=tm, l=l, doLaterTests=doLaterTests):
if task.time > .02:
assert l == [3,]
doLaterTests[0] -= 1
return task.done
return task.cont
tm.doMethodLater(.01, _testDoLaterExtraArgs, 'testDoLaterExtraArgs', extraArgs=[3,])
doLaterTests[0] += 1
tm.add(_monitorDoLaterExtraArgs, 'monitorDoLaterExtraArgs')
_testDoLaterExtraArgs = None
_monitorDoLaterExtraArgs = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater appendTask
l = []
def _testDoLaterAppendTask(arg1, task, l=l):
assert task.name == 'testDoLaterAppendTask'
l.append(arg1)
def _monitorDoLaterAppendTask(task, tm=tm, l=l, doLaterTests=doLaterTests):
if task.time > .02:
assert l == [4,]
doLaterTests[0] -= 1
return task.done
return task.cont
tm.doMethodLater(.01, _testDoLaterAppendTask, 'testDoLaterAppendTask',
extraArgs=[4,], appendTask=True)
doLaterTests[0] += 1
tm.add(_monitorDoLaterAppendTask, 'monitorDoLaterAppendTask')
_testDoLaterAppendTask = None
_monitorDoLaterAppendTask = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater uponDeath
l = []
def _testUponDeathFunc(task, l=l):
assert task.name == 'testDoLaterUponDeath'
l.append(10)
def _testDoLaterUponDeath(arg1, l=l):
return done
def _monitorDoLaterUponDeath(task, tm=tm, l=l, doLaterTests=doLaterTests):
if task.time > .02:
assert l == [10,]
doLaterTests[0] -= 1
return task.done
return task.cont
tm.doMethodLater(.01, _testDoLaterUponDeath, 'testDoLaterUponDeath',
uponDeath=_testUponDeathFunc)
doLaterTests[0] += 1
tm.add(_monitorDoLaterUponDeath, 'monitorDoLaterUponDeath')
_testUponDeathFunc = None
_testDoLaterUponDeath = None
_monitorDoLaterUponDeath = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# doLater owner
class _DoLaterOwner:
def _clearTask(self, task):
self.clearedTaskName = task.name
doLaterOwner = _DoLaterOwner()
l = []
def _testDoLaterOwner(l=l):
pass
def _monitorDoLaterOwner(task, tm=tm, l=l, doLaterOwner=doLaterOwner,
doLaterTests=doLaterTests):
if task.time > .02:
assert hasattr(doLaterOwner, 'clearedTaskName')
assert doLaterOwner.clearedTaskName == 'testDoLaterOwner'
doLaterTests[0] -= 1
return task.done
return task.cont
tm.doMethodLater(.01, _testDoLaterOwner, 'testDoLaterOwner',
owner=doLaterOwner)
doLaterTests[0] += 1
tm.add(_monitorDoLaterOwner, 'monitorDoLaterOwner')
_testDoLaterOwner = None
_monitorDoLaterOwner = None
del doLaterOwner
_DoLaterOwner = None
# don't check until all the doLaters are finished
#tm._checkMemLeaks()
# run the doLater tests
while doLaterTests[0] > 0:
tm.step()
del doLaterTests
tm._checkMemLeaks()
# getTasks
def _testGetTasks(task):
return task.cont
# the doLaterProcessor is always running
assert len(tm.getTasks()) == 1
tm.add(_testGetTasks, 'testGetTasks1')
assert len(tm.getTasks()) == 2
assert (tm.getTasks()[0].name == 'testGetTasks1' or
tm.getTasks()[1].name == 'testGetTasks1')
tm.add(_testGetTasks, 'testGetTasks2')
tm.add(_testGetTasks, 'testGetTasks3')
assert len(tm.getTasks()) == 4
tm.remove('testGetTasks2')
assert len(tm.getTasks()) == 3
tm.remove('testGetTasks1')
tm.remove('testGetTasks3')
assert len(tm.getTasks()) == 1
_testGetTasks = None
tm._checkMemLeaks()
# getDoLaters
def _testGetDoLaters():
pass
# the doLaterProcessor is always running
assert len(tm.getDoLaters()) == 0
tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater1')
assert len(tm.getDoLaters()) == 1
assert tm.getDoLaters()[0].name == 'testDoLater1'
tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater2')
tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater3')
assert len(tm.getDoLaters()) == 3
tm.remove('testDoLater2')
assert len(tm.getDoLaters()) == 2
tm.remove('testDoLater1')
tm.remove('testDoLater3')
assert len(tm.getDoLaters()) == 0
_testGetTasks = None
tm._checkMemLeaks()
# getTasksNamed
def _testGetTasksNamed(task):
return task.cont
assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
tm.add(_testGetTasksNamed, 'testGetTasksNamed')
assert len(tm.getTasksNamed('testGetTasksNamed')) == 1
assert tm.getTasksNamed('testGetTasksNamed')[0].name == 'testGetTasksNamed'
tm.add(_testGetTasksNamed, 'testGetTasksNamed')
tm.add(_testGetTasksNamed, 'testGetTasksNamed')
assert len(tm.getTasksNamed('testGetTasksNamed')) == 3
tm.remove('testGetTasksNamed')
assert len(tm.getTasksNamed('testGetTasksNamed')) == 0
_testGetTasksNamed = None
tm._checkMemLeaks()
# removeTasksMatching
def _testRemoveTasksMatching(task):
return task.cont
tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching')
assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 1
tm.removeTasksMatching('testRemoveTasksMatching')
assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 0
tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1')
tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2')
assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 1
assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 1
tm.removeTasksMatching('testRemoveTasksMatching*')
assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 0
assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 0
tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1a')
tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2a')
assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 1
assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 1
tm.removeTasksMatching('testRemoveTasksMatching?a')
assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 0
assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 0
_testRemoveTasksMatching = None
tm._checkMemLeaks()
# create Task object and add to mgr
l = []
def _testTaskObj(task, l=l):
l.append(None)
return task.cont
t = Task(_testTaskObj)
tm.add(t, 'testTaskObj')
tm.step()
assert len(l) == 1
tm.step()
assert len(l) == 2
tm.remove('testTaskObj')
tm.step()
assert len(l) == 2
_testTaskObj = None
tm._checkMemLeaks()
# remove Task via task.remove()
l = []
def _testTaskObjRemove(task, l=l):
l.append(None)
return task.cont
t = Task(_testTaskObjRemove)
tm.add(t, 'testTaskObjRemove')
tm.step()
assert len(l) == 1
tm.step()
assert len(l) == 2
t.remove()
tm.step()
assert len(l) == 2
del t
_testTaskObjRemove = None
tm._checkMemLeaks()
"""
# this test fails, and it's not clear what the correct behavior should be.
# priority passed to Task.__init__ is always overridden by taskMgr.add()
# even if no priority is specified, and calling Task.setPriority() has no
# effect on the taskMgr's behavior.
# set/get Task priority
l = []
def _testTaskObjPriority(arg, task, l=l):
l.append(arg)
return task.cont
t1 = Task(_testTaskObjPriority, priority=1)
t2 = Task(_testTaskObjPriority, priority=2)
tm.add(t1, 'testTaskObjPriority1', extraArgs=['a',], appendTask=True)
tm.add(t2, 'testTaskObjPriority2', extraArgs=['b',], appendTask=True)
tm.step()
assert len(l) == 2
assert l == ['a', 'b']
assert t1.getPriority() == 1
assert t2.getPriority() == 2
t1.setPriority(3)
assert t1.getPriority() == 3
tm.step()
assert len(l) == 4
assert l == ['a', 'b', 'b', 'a',]
t1.remove()
t2.remove()
tm.step()
assert len(l) == 4
del t1
del t2
_testTaskObjPriority = None
tm._checkMemLeaks()
"""
del l
tm.destroy()
del tm
runTests()
del runTests
""" """
import Task import Task