From fa1bec2af7c188d572ff8d2e2c7c458a3770c097 Mon Sep 17 00:00:00 2001 From: Darren Ranalli Date: Mon, 5 May 2008 22:17:04 +0000 Subject: [PATCH] added TaskManager unit tests --- direct/src/task/Task.py | 482 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 482 insertions(+) diff --git a/direct/src/task/Task.py b/direct/src/task/Task.py index 483f8db728..6b5d0e032f 100644 --- a/direct/src/task/Task.py +++ b/direct/src/task/Task.py @@ -401,6 +401,13 @@ class TaskManager: # A default task. self.add(self.__doLaterProcessor, "doLaterProcessor", -10) + def destroy(self): + del self.nameDict + del self.trueClock + del self.globalClock + del self.__doLaterList + del self.pendingTaskDict + del self.taskList def setStepping(self, value): self.stepping = value @@ -1153,6 +1160,32 @@ class TaskManager: return self.globalClock.getFrameTime() return self.trueClock.getShortTime() + if __debug__: + # to catch memory leaks during the tests at the bottom of the file + def _startTrackingMemLeaks(self): + self._memUsage = ScratchPad() + mu = self._memUsage + mu.lenTaskList = len(self.taskList) + mu.lenPendingTaskDict = len(self.pendingTaskDict) + mu.lenDoLaterList = len(self.__doLaterList) + mu.lenNameDict = len(self.nameDict) + + def _stopTrackingMemLeaks(self): + self._memUsage.destroy() + del self._memUsage + + def _checkMemLeaks(self): + # give the mgr a chance to clear out removed tasks + self.step() + mu = self._memUsage + # the task list looks like it grows and never shrinks, replacing finished + # tasks with 'None' in the TaskPriorityLists. + # TODO: look at reducing memory usage here--clear out excess at the end of every frame? + #assert mu.lenTaskList == len(self.taskList) + assert mu.lenPendingTaskDict == len(self.pendingTaskDict) + assert mu.lenDoLaterList == len(self.__doLaterList) + assert mu.lenNameDict == len(self.nameDict) + def startOsd(self): self.add(self.doOsd, 'taskMgr.doOsd') self._osdEnabled = None @@ -1218,6 +1251,455 @@ cont = Task.cont again = Task.again +if __debug__: + # keep everything in a function namespace so it's easier to clean up + def runTests(): + tm = TaskManager() + # looks like nothing runs on the first frame...? + # step to get past the first frame + tm.step() + + # check for memory leaks after every test + tm._startTrackingMemLeaks() + tm._checkMemLeaks() + + # run-once task + l = [] + def _testDone(task, l=l): + l.append(None) + return task.done + tm.add(_testDone, 'testDone') + tm.step() + assert len(l) == 1 + tm.step() + assert len(l) == 1 + _testDone = None + tm._checkMemLeaks() + + # remove by name + def _testRemoveByName(task): + return task.done + tm.add(_testRemoveByName, 'testRemoveByName') + assert tm.remove('testRemoveByName') == 1 + assert tm.remove('testRemoveByName') == 0 + _testRemoveByName = None + tm._checkMemLeaks() + + # continued task + l = [] + def _testCont(task, l = l): + l.append(None) + return task.cont + tm.add(_testCont, 'testCont') + tm.step() + assert len(l) == 1 + tm.step() + assert len(l) == 2 + tm.remove('testCont') + _testCont = None + tm._checkMemLeaks() + + # continue until done task + l = [] + def _testContDone(task, l = l): + l.append(None) + if len(l) >= 2: + return task.done + else: + return task.cont + tm.add(_testContDone, 'testContDone') + tm.step() + assert len(l) == 1 + tm.step() + assert len(l) == 2 + tm.step() + assert len(l) == 2 + assert not tm.hasTaskNamed('testContDone') + _testContDone = None + tm._checkMemLeaks() + + # hasTaskNamed + def _testHasTaskNamed(task): + return task.done + tm.add(_testHasTaskNamed, 'testHasTaskNamed') + assert tm.hasTaskNamed('testHasTaskNamed') + tm.step() + assert not tm.hasTaskNamed('testHasTaskNamed') + _testHasTaskNamed = None + tm._checkMemLeaks() + + # task priority + l = [] + def _testPri1(task, l = l): + l.append(1) + return task.cont + def _testPri2(task, l = l): + l.append(2) + return task.cont + tm.add(_testPri1, 'testPri1', priority = 1) + tm.add(_testPri2, 'testPri2', priority = 2) + tm.step() + assert len(l) == 2 + assert l == [1, 2,] + tm.step() + assert len(l) == 4 + assert l == [1, 2, 1, 2,] + tm.remove('testPri1') + tm.remove('testPri2') + _testPri1 = None + _testPri2 = None + tm._checkMemLeaks() + + # task extraArgs + l = [] + def _testExtraArgs(arg1, arg2, l=l): + l.extend([arg1, arg2,]) + return done + tm.add(_testExtraArgs, 'testExtraArgs', extraArgs=[4,5]) + tm.step() + assert len(l) == 2 + assert l == [4, 5,] + _testExtraArgs = None + tm._checkMemLeaks() + + # task appendTask + l = [] + def _testAppendTask(arg1, arg2, task, l=l): + l.extend([arg1, arg2,]) + return task.done + tm.add(_testAppendTask, '_testAppendTask', extraArgs=[4,5], appendTask=True) + tm.step() + assert len(l) == 2 + assert l == [4, 5,] + _testAppendTask = None + tm._checkMemLeaks() + + # task uponDeath + l = [] + def _uponDeathFunc(task, l=l): + l.append(task.name) + def _testUponDeath(task): + return done + tm.add(_testUponDeath, 'testUponDeath', uponDeath=_uponDeathFunc) + tm.step() + assert len(l) == 1 + assert l == ['testUponDeath'] + _testUponDeath = None + _uponDeathFunc = None + tm._checkMemLeaks() + + # task owner + class _TaskOwner: + def _clearTask(self, task): + self.clearedTaskName = task.name + to = _TaskOwner() + l = [] + def _testOwner(task): + return done + tm.add(_testOwner, 'testOwner', owner=to) + tm.step() + assert hasattr(to, 'clearedTaskName') + assert to.clearedTaskName == 'testOwner' + _testOwner = None + del to + _TaskOwner = None + tm._checkMemLeaks() + + + doLaterTests = [0,] + + # doLater + l = [] + def _testDoLater1(task, l=l): + l.append(1) + def _testDoLater2(task, l=l): + l.append(2) + def _monitorDoLater(task, tm=tm, l=l, doLaterTests=doLaterTests): + if task.time > .03: + assert l == [1, 2,] + doLaterTests[0] -= 1 + return task.done + return task.cont + tm.doMethodLater(.01, _testDoLater1, 'testDoLater1') + tm.doMethodLater(.02, _testDoLater2, 'testDoLater2') + doLaterTests[0] += 1 + tm.add(_monitorDoLater, 'monitorDoLater') + _testDoLater1 = None + _testDoLater2 = None + _monitorDoLater = None + # don't check until all the doLaters are finished + #tm._checkMemLeaks() + + # doLater priority + l = [] + def _testDoLaterPri1(task, l=l): + l.append(1) + def _testDoLaterPri2(task, l=l): + l.append(2) + def _monitorDoLaterPri(task, tm=tm, l=l, doLaterTests=doLaterTests): + if task.time > .02: + assert l == [1, 2,] + doLaterTests[0] -= 1 + return task.done + return task.cont + tm.doMethodLater(.01, _testDoLaterPri1, 'testDoLaterPri1', priority=1) + tm.doMethodLater(.01, _testDoLaterPri2, 'testDoLaterPri2', priority=2) + doLaterTests[0] += 1 + tm.add(_monitorDoLaterPri, 'monitorDoLaterPri') + _testDoLaterPri1 = None + _testDoLaterPri2 = None + _monitorDoLaterPri = None + # don't check until all the doLaters are finished + #tm._checkMemLeaks() + + # doLater extraArgs + l = [] + def _testDoLaterExtraArgs(arg1, l=l): + l.append(arg1) + def _monitorDoLaterExtraArgs(task, tm=tm, l=l, doLaterTests=doLaterTests): + if task.time > .02: + assert l == [3,] + doLaterTests[0] -= 1 + return task.done + return task.cont + tm.doMethodLater(.01, _testDoLaterExtraArgs, 'testDoLaterExtraArgs', extraArgs=[3,]) + doLaterTests[0] += 1 + tm.add(_monitorDoLaterExtraArgs, 'monitorDoLaterExtraArgs') + _testDoLaterExtraArgs = None + _monitorDoLaterExtraArgs = None + # don't check until all the doLaters are finished + #tm._checkMemLeaks() + + # doLater appendTask + l = [] + def _testDoLaterAppendTask(arg1, task, l=l): + assert task.name == 'testDoLaterAppendTask' + l.append(arg1) + def _monitorDoLaterAppendTask(task, tm=tm, l=l, doLaterTests=doLaterTests): + if task.time > .02: + assert l == [4,] + doLaterTests[0] -= 1 + return task.done + return task.cont + tm.doMethodLater(.01, _testDoLaterAppendTask, 'testDoLaterAppendTask', + extraArgs=[4,], appendTask=True) + doLaterTests[0] += 1 + tm.add(_monitorDoLaterAppendTask, 'monitorDoLaterAppendTask') + _testDoLaterAppendTask = None + _monitorDoLaterAppendTask = None + # don't check until all the doLaters are finished + #tm._checkMemLeaks() + + # doLater uponDeath + l = [] + def _testUponDeathFunc(task, l=l): + assert task.name == 'testDoLaterUponDeath' + l.append(10) + def _testDoLaterUponDeath(arg1, l=l): + return done + def _monitorDoLaterUponDeath(task, tm=tm, l=l, doLaterTests=doLaterTests): + if task.time > .02: + assert l == [10,] + doLaterTests[0] -= 1 + return task.done + return task.cont + tm.doMethodLater(.01, _testDoLaterUponDeath, 'testDoLaterUponDeath', + uponDeath=_testUponDeathFunc) + doLaterTests[0] += 1 + tm.add(_monitorDoLaterUponDeath, 'monitorDoLaterUponDeath') + _testUponDeathFunc = None + _testDoLaterUponDeath = None + _monitorDoLaterUponDeath = None + # don't check until all the doLaters are finished + #tm._checkMemLeaks() + + # doLater owner + class _DoLaterOwner: + def _clearTask(self, task): + self.clearedTaskName = task.name + doLaterOwner = _DoLaterOwner() + l = [] + def _testDoLaterOwner(l=l): + pass + def _monitorDoLaterOwner(task, tm=tm, l=l, doLaterOwner=doLaterOwner, + doLaterTests=doLaterTests): + if task.time > .02: + assert hasattr(doLaterOwner, 'clearedTaskName') + assert doLaterOwner.clearedTaskName == 'testDoLaterOwner' + doLaterTests[0] -= 1 + return task.done + return task.cont + tm.doMethodLater(.01, _testDoLaterOwner, 'testDoLaterOwner', + owner=doLaterOwner) + doLaterTests[0] += 1 + tm.add(_monitorDoLaterOwner, 'monitorDoLaterOwner') + _testDoLaterOwner = None + _monitorDoLaterOwner = None + del doLaterOwner + _DoLaterOwner = None + # don't check until all the doLaters are finished + #tm._checkMemLeaks() + + # run the doLater tests + while doLaterTests[0] > 0: + tm.step() + del doLaterTests + tm._checkMemLeaks() + + # getTasks + def _testGetTasks(task): + return task.cont + # the doLaterProcessor is always running + assert len(tm.getTasks()) == 1 + tm.add(_testGetTasks, 'testGetTasks1') + assert len(tm.getTasks()) == 2 + assert (tm.getTasks()[0].name == 'testGetTasks1' or + tm.getTasks()[1].name == 'testGetTasks1') + tm.add(_testGetTasks, 'testGetTasks2') + tm.add(_testGetTasks, 'testGetTasks3') + assert len(tm.getTasks()) == 4 + tm.remove('testGetTasks2') + assert len(tm.getTasks()) == 3 + tm.remove('testGetTasks1') + tm.remove('testGetTasks3') + assert len(tm.getTasks()) == 1 + _testGetTasks = None + tm._checkMemLeaks() + + # getDoLaters + def _testGetDoLaters(): + pass + # the doLaterProcessor is always running + assert len(tm.getDoLaters()) == 0 + tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater1') + assert len(tm.getDoLaters()) == 1 + assert tm.getDoLaters()[0].name == 'testDoLater1' + tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater2') + tm.doMethodLater(.1, _testGetDoLaters, 'testDoLater3') + assert len(tm.getDoLaters()) == 3 + tm.remove('testDoLater2') + assert len(tm.getDoLaters()) == 2 + tm.remove('testDoLater1') + tm.remove('testDoLater3') + assert len(tm.getDoLaters()) == 0 + _testGetTasks = None + tm._checkMemLeaks() + + # getTasksNamed + def _testGetTasksNamed(task): + return task.cont + assert len(tm.getTasksNamed('testGetTasksNamed')) == 0 + tm.add(_testGetTasksNamed, 'testGetTasksNamed') + assert len(tm.getTasksNamed('testGetTasksNamed')) == 1 + assert tm.getTasksNamed('testGetTasksNamed')[0].name == 'testGetTasksNamed' + tm.add(_testGetTasksNamed, 'testGetTasksNamed') + tm.add(_testGetTasksNamed, 'testGetTasksNamed') + assert len(tm.getTasksNamed('testGetTasksNamed')) == 3 + tm.remove('testGetTasksNamed') + assert len(tm.getTasksNamed('testGetTasksNamed')) == 0 + _testGetTasksNamed = None + tm._checkMemLeaks() + + # removeTasksMatching + def _testRemoveTasksMatching(task): + return task.cont + tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching') + assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 1 + tm.removeTasksMatching('testRemoveTasksMatching') + assert len(tm.getTasksNamed('testRemoveTasksMatching')) == 0 + tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1') + tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2') + assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 1 + assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 1 + tm.removeTasksMatching('testRemoveTasksMatching*') + assert len(tm.getTasksNamed('testRemoveTasksMatching1')) == 0 + assert len(tm.getTasksNamed('testRemoveTasksMatching2')) == 0 + tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching1a') + tm.add(_testRemoveTasksMatching, 'testRemoveTasksMatching2a') + assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 1 + assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 1 + tm.removeTasksMatching('testRemoveTasksMatching?a') + assert len(tm.getTasksNamed('testRemoveTasksMatching1a')) == 0 + assert len(tm.getTasksNamed('testRemoveTasksMatching2a')) == 0 + _testRemoveTasksMatching = None + tm._checkMemLeaks() + + # create Task object and add to mgr + l = [] + def _testTaskObj(task, l=l): + l.append(None) + return task.cont + t = Task(_testTaskObj) + tm.add(t, 'testTaskObj') + tm.step() + assert len(l) == 1 + tm.step() + assert len(l) == 2 + tm.remove('testTaskObj') + tm.step() + assert len(l) == 2 + _testTaskObj = None + tm._checkMemLeaks() + + # remove Task via task.remove() + l = [] + def _testTaskObjRemove(task, l=l): + l.append(None) + return task.cont + t = Task(_testTaskObjRemove) + tm.add(t, 'testTaskObjRemove') + tm.step() + assert len(l) == 1 + tm.step() + assert len(l) == 2 + t.remove() + tm.step() + assert len(l) == 2 + del t + _testTaskObjRemove = None + tm._checkMemLeaks() + + """ + # this test fails, and it's not clear what the correct behavior should be. + # priority passed to Task.__init__ is always overridden by taskMgr.add() + # even if no priority is specified, and calling Task.setPriority() has no + # effect on the taskMgr's behavior. + # set/get Task priority + l = [] + def _testTaskObjPriority(arg, task, l=l): + l.append(arg) + return task.cont + t1 = Task(_testTaskObjPriority, priority=1) + t2 = Task(_testTaskObjPriority, priority=2) + tm.add(t1, 'testTaskObjPriority1', extraArgs=['a',], appendTask=True) + tm.add(t2, 'testTaskObjPriority2', extraArgs=['b',], appendTask=True) + tm.step() + assert len(l) == 2 + assert l == ['a', 'b'] + assert t1.getPriority() == 1 + assert t2.getPriority() == 2 + t1.setPriority(3) + assert t1.getPriority() == 3 + tm.step() + assert len(l) == 4 + assert l == ['a', 'b', 'b', 'a',] + t1.remove() + t2.remove() + tm.step() + assert len(l) == 4 + del t1 + del t2 + _testTaskObjPriority = None + tm._checkMemLeaks() + """ + + del l + tm.destroy() + del tm + + runTests() + del runTests + """ import Task