From 8a8b59d48486b34867d8472a1182500ef13fb9a0 Mon Sep 17 00:00:00 2001 From: Darren Ranalli Date: Sat, 10 Mar 2007 11:06:21 +0000 Subject: [PATCH] added JobManager --- direct/src/showbase/Job.py | 65 ++++++++++++++ direct/src/showbase/JobManager.py | 108 ++++++++++++++++++++++++ direct/src/showbase/JobManagerGlobal.py | 5 ++ 3 files changed, 178 insertions(+) create mode 100755 direct/src/showbase/Job.py create mode 100755 direct/src/showbase/JobManager.py create mode 100755 direct/src/showbase/JobManagerGlobal.py diff --git a/direct/src/showbase/Job.py b/direct/src/showbase/Job.py new file mode 100755 index 0000000000..e3c152ca86 --- /dev/null +++ b/direct/src/showbase/Job.py @@ -0,0 +1,65 @@ +class Job: + # Base class for cpu-intensive or non-time-critical operations that + # are run through the JobManager. + Done = object() + Continue = object() + Priorities = ScratchPad(Low=-100, Normal=0, High=100) + _SerialGen = SerialNumGen() + + def __init__(self, name): + self._name = name + self._generator = None + self._id = Job._SerialGen.next() + + def destroy(self): + del self._name + del self._generator + + def getPriority(self): + # override if you want a different priority + # you can use numbers other than those in Job.Priorities + return Job.Priorities.Normal + + def run(self): + # override and yield Job.Continue when possible/reasonable + # try not to run longer than the JobManager's timeslice between yields + # when done, yield Job.Done + raise "don't call down" + + def _getJobId(self): + return self._id + + def _getGenerator(self): + if self._generator is None: + self._generator = self.run() + return self._generator + +if __debug__: # __dev__ not yet available at this point + from direct.showbase.Job import Job + class TestJob(Job): + def __init__(self): + Job.__init__(self, 'TestJob') + self._counter = 0 + self._accum = 0 + self._finished = False + + def run(self): + while True: + while self._accum < 100: + self._accum += 1 + print 'counter = %s, accum = %s' % (self._counter, self._accum) + yield Job.Continue + + self._accum = 0 + self._counter += 1 + + if self._counter >= 100: + print 'Job.Done' + yield Job.Done + else: + yield Job.Continue + + def addTestJob(): + t = TestJob() + jobMgr.add(t) + diff --git a/direct/src/showbase/JobManager.py b/direct/src/showbase/JobManager.py new file mode 100755 index 0000000000..2228519329 --- /dev/null +++ b/direct/src/showbase/JobManager.py @@ -0,0 +1,108 @@ +from direct.directnotify.DirectNotifyGlobal import directNotify +from direct.task.TaskManagerGlobal import taskMgr +from direct.showbase.Job import Job + +class JobManager: + """ + Similar to the taskMgr but designed for tasks that are CPU-intensive and/or + not time-critical. Jobs run one at a time, in order of priority, in + the timeslice that the JobManager is allowed to run each frame. + """ + notify = directNotify.newCategory("JobManager") + + # there's one main task for the JobManager, all jobs run in this task + TaskName = 'jobManager' + # run for one millisecond per frame by default + DefTimeslice = .001 + + def __init__(self, timeslice=None): + if timeslice is None: + timeslice = JobManager.DefTimeslice + # how long do we run per frame + self._timeslice = timeslice + # store the jobs in these structures to allow fast lookup by various keys + # priority -> jobId -> job + self._pri2jobId2job = {} + # priority -> chronological list of jobIds + self._pri2jobIds = {} + # jobId -> priority + self._jobId2pri = {} + self._highestPriority = Job.Priorities.Normal + + def destroy(self): + taskMgr.remove(JobManager.TaskName) + del self._pri2jobId2job + + def add(self, job): + assert self.notify.debugCall() + pri = job.getPriority() + jobId = job._getJobId() + # store the job in the main table + self._pri2jobId2job.setdefault(pri, {}) + self._pri2jobId2job[pri][jobId] = job + # and also store a direct mapping from the job's ID to its priority + self._jobId2pri[jobId] = pri + # add the jobId onto the end of the list of jobIds for this priority + self._pri2jobIds.setdefault(pri, []) + self._pri2jobIds[pri].append(jobId) + if pri > self._highestPriority: + self._highestPriority = pri + if len(self._jobId2pri) == 1: + taskMgr.add(self._process, JobManager.TaskName) + + def remove(self, job): + assert self.notify.debugCall() + jobId = job._getJobId() + # look up the job's priority + pri = self._jobId2pri.pop(jobId) + # TODO: this removal is a linear search + self._pri2jobIds[pri].remove(jobId) + # remove the job from the main table + del self._pri2jobId2job[pri][jobId] + if len(self._pri2jobId2job[pri]) == 0: + del self._pri2jobId2job[pri] + if pri == self._highestPriority: + if len(self._jobId2pri) > 0: + # calculate a new highest priority + # TODO: this is not very fast + priorities = self._pri2jobId2job.keys() + priorities.sort() + self._highestPriority = priorities[-1] + else: + taskMgr.remove(JobManager.TaskName) + self._highestPriority = 0 + + # how long should we run per frame? + def getTimeslice(self): + return self._timeslice + def setTimeslice(self, timeslice): + self._timeslice = timeslice + + def _process(self, task=None): + if len(self._pri2jobId2job): + assert self.notify.debugCall() + # figure out how long we can run + endT = globalClock.getRealTime() + (self._timeslice * .9) + while True: + # always process the highest priority first + jobId2job = self._pri2jobId2job[self._highestPriority] + # process jobs with equal priority in the order they came in + jobId = self._pri2jobIds[self._highestPriority][-1] + job = jobId2job[jobId] + gen = job._getGenerator() + while globalClock.getRealTime() < endT: + result = gen.next() + if result is Job.Done: + self.remove(job) + # highest-priority job is done. + # grab the next one if there's time left + break + else: + # we've run out of time + assert self.notify.debug('out of time: %s, %s' % (endT, globalClock.getRealTime())) + break + + if len(self._pri2jobId2job) == 0: + # there's nothing left to do + break + return task.cont diff --git a/direct/src/showbase/JobManagerGlobal.py b/direct/src/showbase/JobManagerGlobal.py new file mode 100755 index 0000000000..c5f591715d --- /dev/null +++ b/direct/src/showbase/JobManagerGlobal.py @@ -0,0 +1,5 @@ +__all__ = ['jobMgr'] + +import JobManager + +jobMgr = JobManager.JobManager()