round-robin Jobs based on priority

This commit is contained in:
Darren Ranalli 2007-04-10 08:33:31 +00:00
parent 8e9d2b85ab
commit c5e0e5907a
2 changed files with 40 additions and 13 deletions

View File

@ -10,9 +10,12 @@ class Job(DirectObject):
# values to yield from your run() generator method # values to yield from your run() generator method
Done = object() Done = object()
Continue = None # 'yield None' is acceptable in place of 'yield Job.Continue' Continue = None # 'yield None' is acceptable in place of 'yield Job.Continue'
Sleep = object() # yield any remaining time for this job until next frame
# these priorities are reference points, you can use whatever numbers you want # These priorities determine how many timeslices a job gets relative to other
Priorities = ScratchPad(Low=-100, Normal=0, High=100) # jobs. A job with priority of 1000 will run 10 times more often than a job
# with priority of 100.
Priorities = ScratchPad(Min=1, Low=100, Normal=1000, High=10000)
_SerialGen = SerialNumGen() _SerialGen = SerialNumGen()
def __init__(self, name): def __init__(self, name):

View File

@ -13,7 +13,7 @@ class JobManager:
# there's one task for the JobManager, all jobs run in this task # there's one task for the JobManager, all jobs run in this task
TaskName = 'jobManager' TaskName = 'jobManager'
# run for 1/2 millisecond per frame by default # run for 1/2 millisecond per frame by default
DefTimeslice = (1./1000.) / 2. DefTimeslice = (1./1000.) * .5
def __init__(self, timeslice=None): def __init__(self, timeslice=None):
if timeslice is None: if timeslice is None:
@ -27,6 +27,14 @@ class JobManager:
self._pri2jobIds = {} self._pri2jobIds = {}
# jobId -> priority # jobId -> priority
self._jobId2pri = {} self._jobId2pri = {}
# how many timeslices to give each job; this is used to efficiently implement
# the relative job priorities
self._jobId2timeslices = {}
# this is the working copy of _jobId2timeslices that we use to count down how
# many timeslices to give each job
self._jobId2timeslicesLeft = {}
# this is used to round-robin the jobs in _jobId2timeslicesLeft
self._curJobIndex = 0
self._highestPriority = Job.Priorities.Normal self._highestPriority = Job.Priorities.Normal
def destroy(self): def destroy(self):
@ -44,6 +52,8 @@ class JobManager:
# add the jobId onto the end of the list of jobIds for this priority # add the jobId onto the end of the list of jobIds for this priority
self._pri2jobIds.setdefault(pri, []) self._pri2jobIds.setdefault(pri, [])
self._pri2jobIds[pri].append(jobId) self._pri2jobIds[pri].append(jobId)
# record the job's relative timeslice count
self._jobId2timeslices[jobId] = pri
if len(self._jobId2pri) == 1: if len(self._jobId2pri) == 1:
taskMgr.add(self._process, JobManager.TaskName) taskMgr.add(self._process, JobManager.TaskName)
self._highestPriority = pri self._highestPriority = pri
@ -61,6 +71,9 @@ class JobManager:
del self._pri2jobId2job[pri][jobId] del self._pri2jobId2job[pri][jobId]
# clean up the job's generator, if any # clean up the job's generator, if any
job._cleanupGenerator() job._cleanupGenerator()
# remove the job's timeslice count
self._jobId2timeslices.pop(jobId)
self._jobId2timeslicesLeft.pop(jobId)
if len(self._pri2jobId2job[pri]) == 0: if len(self._pri2jobId2job[pri]) == 0:
del self._pri2jobId2job[pri] del self._pri2jobId2job[pri]
if pri == self._highestPriority: if pri == self._highestPriority:
@ -122,13 +135,18 @@ class JobManager:
# figure out how long we can run # figure out how long we can run
endT = globalClock.getRealTime() + (self._timeslice * .9) endT = globalClock.getRealTime() + (self._timeslice * .9)
while True: while True:
# always process the highest priority first # round-robin the jobs, dropping them as they run out of priority timeslices
# TODO: give occasional timeslices to lower priorities to avoid starving # until all timeslices are used
# lower-priority jobs if len(self._jobId2timeslicesLeft) == 0:
jobId2job = self._pri2jobId2job[self._highestPriority] self._jobId2timeslicesLeft = dict(self._jobId2timeslices)
# process jobs with equal priority in the order they came in self._curJobIndex = (self._curJobIndex + 1) % len(self._jobId2timeslicesLeft)
jobId = self._pri2jobIds[self._highestPriority][0] jobId = self._jobId2timeslicesLeft.keys()[self._curJobIndex]
job = jobId2job[jobId] # use up one of this job's timeslices
self._jobId2timeslicesLeft[jobId] -= 1
if self._jobId2timeslicesLeft[jobId] == 0:
del self._jobId2timeslicesLeft[jobId]
pri = self._jobId2pri[jobId]
job = self._pri2jobId2job[pri][jobId]
gen = job._getGenerator() gen = job._getGenerator()
if __debug__: if __debug__:
job._pstats.start() job._pstats.start()
@ -141,15 +159,21 @@ class JobManager:
# treat it as if it returned Job.Done # treat it as if it returned Job.Done
self.notify.warning('job %s never yielded Job.Done' % job) self.notify.warning('job %s never yielded Job.Done' % job)
result = Job.Done result = Job.Done
if result is Job.Done:
if result is Job.Sleep:
job.suspend()
if __debug__:
job._pstats.stop()
# grab the next job if there's time left
break
elif result is Job.Done:
job.suspend() job.suspend()
self.remove(job) self.remove(job)
job.finished() job.finished()
if __debug__: if __debug__:
job._pstats.stop() job._pstats.stop()
messenger.send(job.getFinishedEvent()) messenger.send(job.getFinishedEvent())
# highest-priority job is done. # grab the next job if there's time left
# grab the next one if there's time left
break break
else: else:
# we've run out of time # we've run out of time