lower-impact asynchronous download

This commit is contained in:
David Rose 2009-12-29 22:52:24 +00:00
parent 5cc578f524
commit 5f4c4670bd
2 changed files with 144 additions and 82 deletions

View File

@ -25,11 +25,12 @@ class PackageInfo:
unpackFactor = 0.01 unpackFactor = 0.01
patchFactor = 0.01 patchFactor = 0.01
# These tokens are returned by __downloadFile() and other # These tokens are yielded (not returned) by __downloadFile() and
# InstallStep functions. # other InstallStep functions.
stepComplete = 1 stepComplete = 1
stepFailed = 2 stepFailed = 2
restartDownload = 3 restartDownload = 3
stepContinue = 4
class InstallStep: class InstallStep:
""" This class is one step of the installPlan list; it """ This class is one step of the installPlan list; it
@ -194,29 +195,49 @@ class PackageInfo:
synchronously, and then reads it. Returns true on success, synchronously, and then reads it. Returns true on success,
false on failure. """ false on failure. """
for token in self.downloadDescFileGenerator(http):
if token != self.stepContinue:
break
Thread.considerYield()
return (token == self.stepComplete)
def downloadDescFileGenerator(self, http):
""" A generator function that implements downloadDescFile()
one piece at a time. It yields one of stepComplete,
stepFailed, or stepContinue. """
assert self.descFile assert self.descFile
if self.hasDescFile: if self.hasDescFile:
# We've already got one. # We've already got one.
return True yield self.stepComplete; return
self.http = http self.http = http
token = self.__downloadFile( for token in self.__downloadFile(
None, self.descFile, None, self.descFile,
urlbase = self.descFile.filename, urlbase = self.descFile.filename,
filename = self.descFileBasename) filename = self.descFileBasename):
if token == self.stepContinue:
yield token
else:
break
while token == self.restartDownload: while token == self.restartDownload:
# Try again. # Try again.
token = self.__downloadFile( for token in self.__downloadFile(
None, self.descFile, None, self.descFile,
urlbase = self.descFile.filename, urlbase = self.descFile.filename,
filename = self.descFileBasename) filename = self.descFileBasename):
if token == self.stepContinue:
yield token
else:
break
if token == self.stepFailed: if token == self.stepFailed:
# Couldn't download the desc file. # Couldn't download the desc file.
return False yield self.stepFailed; return
assert token == self.stepComplete assert token == self.stepComplete
@ -228,9 +249,9 @@ class PackageInfo:
# Weird, it passed the hash check, but we still can't read # Weird, it passed the hash check, but we still can't read
# it. # it.
self.notify.warning("Failure reading %s" % (filename)) self.notify.warning("Failure reading %s" % (filename))
return False yield self.stepFailed; return
return True yield self.stepComplete; return
def __readDescFile(self): def __readDescFile(self):
""" Reads the desc xml file for this particular package, """ Reads the desc xml file for this particular package,
@ -500,34 +521,59 @@ class PackageInfo:
in, which will have been done by self.__readDescFile(). in, which will have been done by self.__readDescFile().
""" """
for token in self.downloadPackageGenerator(http):
if token != self.stepContinue:
break
Thread.considerYield()
return (token == self.stepComplete)
def downloadPackageGenerator(self, http):
""" A generator function that implements downloadPackage() one
piece at a time. It yields one of stepComplete, stepFailed,
or stepContinue. """
assert self.hasDescFile assert self.hasDescFile
if self.hasPackage: if self.hasPackage:
# We've already got one. # We've already got one.
return True yield self.stepComplete; return
# We should have an install plan by the time we get here. # We should have an install plan by the time we get here.
assert self.installPlans assert self.installPlans
self.http = http self.http = http
token = self.__followInstallPlans() for token in self.__followInstallPlans():
if token == self.stepContinue:
yield token
else:
break
while token == self.restartDownload: while token == self.restartDownload:
# Try again. # Try again.
if not self.downloadDescFile(http): for token in self.downloadDescFileGenerator(http):
return False if token == self.stepContinue:
yield token
token = self.__followInstallPlans() else:
break
if token == self.stepComplete:
for token in self.__followInstallPlans():
if token == self.stepContinue:
yield token
else:
break
if token == self.stepFailed: if token == self.stepFailed:
return False yield self.stepFailed; return
assert token == self.stepComplete assert token == self.stepComplete
return True yield self.stepComplete; return
def __followInstallPlans(self): def __followInstallPlans(self):
""" Performs all of the steps in self.installPlans. Returns """ Performs all of the steps in self.installPlans. Yields
one of stepComplete, stepFailed, or restartDownload. """ one of stepComplete, stepFailed, restartDownload, or
stepContinue. """
if not self.installPlans: if not self.installPlans:
self.__buildInstallPlans() self.__buildInstallPlans()
@ -543,9 +589,14 @@ class PackageInfo:
for step in plan: for step in plan:
self.currentStepEffort = step.getEffort() self.currentStepEffort = step.getEffort()
token = step.func(step) for token in step.func(step):
if token == self.stepContinue:
yield token
else:
break
if token == self.restartDownload: if token == self.restartDownload:
return token yield token
if token == self.stepFailed: if token == self.stepFailed:
planFailed = True planFailed = True
break break
@ -555,13 +606,13 @@ class PackageInfo:
if not planFailed: if not planFailed:
# Successfully downloaded! # Successfully downloaded!
return self.stepComplete yield self.stepComplete; return
if taskMgr.destroyed: if taskMgr.destroyed:
return self.stepFailed yield self.stepFailed; return
# All plans failed. # All plans failed.
return self.stepFailed yield self.stepFailed; return
def __findPatchChain(self, fileSpec): def __findPatchChain(self, fileSpec):
""" Finds the chain of patches that leads from the indicated """ Finds the chain of patches that leads from the indicated
@ -596,8 +647,8 @@ class PackageInfo:
def __downloadFile(self, step, fileSpec, urlbase = None, filename = None, def __downloadFile(self, step, fileSpec, urlbase = None, filename = None,
allowPartial = False): allowPartial = False):
""" Downloads the indicated file from the host into """ Downloads the indicated file from the host into
packageDir. Returns one of stepComplete, stepFailed, or packageDir. Yields one of stepComplete, stepFailed,
restartDownload. """ restartDownload, or stepContinue. """
if not urlbase: if not urlbase:
urlbase = self.descFileDirname + '/' + fileSpec.filename urlbase = self.descFileDirname + '/' + fileSpec.filename
@ -691,9 +742,9 @@ class PackageInfo:
# If the task manager has been destroyed, we must # If the task manager has been destroyed, we must
# be shutting down. Get out of here. # be shutting down. Get out of here.
self.notify.warning("Task Manager destroyed, aborting %s" % (url)) self.notify.warning("Task Manager destroyed, aborting %s" % (url))
return self.stepFailed yield self.stepFailed; return
Thread.considerYield() yield self.stepContinue
if step: if step:
step.bytesDone = channel.getBytesDownloaded() + channel.getFirstByteDelivered() step.bytesDone = channel.getBytesDownloaded() + channel.getFirstByteDelivered()
@ -710,11 +761,11 @@ class PackageInfo:
# sure. # sure.
if self.host.redownloadContentsFile(self.http): if self.host.redownloadContentsFile(self.http):
# Yes! Go back and start over from the beginning. # Yes! Go back and start over from the beginning.
return self.restartDownload yield self.restartDownload; return
else: else:
# Success! # Success!
return self.stepComplete yield self.stepComplete; return
# Maybe the mirror is bad. Go back and try the next # Maybe the mirror is bad. Go back and try the next
# mirror. # mirror.
@ -723,17 +774,17 @@ class PackageInfo:
# is stale. Try re-downloading it now, just to be sure. # is stale. Try re-downloading it now, just to be sure.
if self.host.redownloadContentsFile(self.http): if self.host.redownloadContentsFile(self.http):
# Yes! Go back and start over from the beginning. # Yes! Go back and start over from the beginning.
return self.restartDownload yield self.restartDownload; return
# All mirrors failed; the server (or the internet connection) # All mirrors failed; the server (or the internet connection)
# must be just fubar. # must be just fubar.
return self.stepFailed yield self.stepFailed; return
def __applyPatch(self, step, patchfile): def __applyPatch(self, step, patchfile):
""" Applies the indicated patching in-place to the current """ Applies the indicated patching in-place to the current
uncompressed archive. The patchfile is removed after the uncompressed archive. The patchfile is removed after the
operation. Returns one of stepComplete, stepFailed, or operation. Yields one of stepComplete, stepFailed,
restartDownload. """ restartDownload, or stepContinue. """
origPathname = Filename(self.getPackageDir(), self.uncompressedArchive.filename) origPathname = Filename(self.getPackageDir(), self.uncompressedArchive.filename)
patchPathname = Filename(self.getPackageDir(), patchfile.file.filename) patchPathname = Filename(self.getPackageDir(), patchfile.file.filename)
@ -752,9 +803,9 @@ class PackageInfo:
# If the task manager has been destroyed, we must # If the task manager has been destroyed, we must
# be shutting down. Get out of here. # be shutting down. Get out of here.
self.notify.warning("Task Manager destroyed, aborting patch %s" % (origPathname)) self.notify.warning("Task Manager destroyed, aborting patch %s" % (origPathname))
return self.stepFailed yield self.stepFailed; return
Thread.considerYield() yield self.stepContinue
ret = p.run() ret = p.run()
del p del p
patchPathname.unlink() patchPathname.unlink()
@ -762,18 +813,18 @@ class PackageInfo:
if ret < 0: if ret < 0:
self.notify.warning("Patching of %s failed." % (origPathname)) self.notify.warning("Patching of %s failed." % (origPathname))
result.unlink() result.unlink()
return self.stepFailed yield self.stepFailed; return
if not result.renameTo(origPathname): if not result.renameTo(origPathname):
self.notify.warning("Couldn't rename %s to %s" % (result, origPathname)) self.notify.warning("Couldn't rename %s to %s" % (result, origPathname))
return self.stepFailed yield self.stepFailed; return
return self.stepComplete yield self.stepComplete; return
def __uncompressArchive(self, step): def __uncompressArchive(self, step):
""" Turns the compressed archive into the uncompressed """ Turns the compressed archive into the uncompressed
archive. Returns one of stepComplete, stepFailed, or archive. Yields one of stepComplete, stepFailed,
restartDownload. """ restartDownload, or stepContinue. """
sourcePathname = Filename(self.getPackageDir(), self.compressedArchive.filename) sourcePathname = Filename(self.getPackageDir(), self.compressedArchive.filename)
targetPathname = Filename(self.getPackageDir(), self.uncompressedArchive.filename) targetPathname = Filename(self.getPackageDir(), self.uncompressedArchive.filename)
@ -791,12 +842,12 @@ class PackageInfo:
# If the task manager has been destroyed, we must # If the task manager has been destroyed, we must
# be shutting down. Get out of here. # be shutting down. Get out of here.
self.notify.warning("Task Manager destroyed, aborting decompresss %s" % (sourcePathname)) self.notify.warning("Task Manager destroyed, aborting decompresss %s" % (sourcePathname))
return self.stepFailed yield self.stepFailed; return
Thread.considerYield() yield self.stepContinue
if result != EUSuccess: if result != EUSuccess:
return self.stepFailed yield self.stepFailed; return
step.bytesDone = totalBytes step.bytesDone = totalBytes
self.__updateStepProgress(step) self.__updateStepProgress(step)
@ -804,31 +855,31 @@ class PackageInfo:
if not self.uncompressedArchive.quickVerify(self.getPackageDir(), notify= self.notify): if not self.uncompressedArchive.quickVerify(self.getPackageDir(), notify= self.notify):
self.notify.warning("after uncompressing, %s still incorrect" % ( self.notify.warning("after uncompressing, %s still incorrect" % (
self.uncompressedArchive.filename)) self.uncompressedArchive.filename))
return self.stepFailed yield self.stepFailed; return
# Now that we've verified the archive, make it read-only. # Now that we've verified the archive, make it read-only.
os.chmod(targetPathname.toOsSpecific(), 0444) os.chmod(targetPathname.toOsSpecific(), 0444)
# Now we can safely remove the compressed archive. # Now we can safely remove the compressed archive.
sourcePathname.unlink() sourcePathname.unlink()
return self.stepComplete yield self.stepComplete; return
def __unpackArchive(self, step): def __unpackArchive(self, step):
""" Unpacks any files in the archive that want to be unpacked """ Unpacks any files in the archive that want to be unpacked
to disk. Returns one of stepComplete, stepFailed, or to disk. Yields one of stepComplete, stepFailed,
restartDownload. """ restartDownload, or stepContinue. """
if not self.extracts: if not self.extracts:
# Nothing to extract. # Nothing to extract.
self.hasPackage = True self.hasPackage = True
return self.stepComplete yield self.stepComplete; return
mfPathname = Filename(self.getPackageDir(), self.uncompressedArchive.filename) mfPathname = Filename(self.getPackageDir(), self.uncompressedArchive.filename)
self.notify.info("Unpacking %s" % (mfPathname)) self.notify.info("Unpacking %s" % (mfPathname))
mf = Multifile() mf = Multifile()
if not mf.openRead(mfPathname): if not mf.openRead(mfPathname):
self.notify.warning("Couldn't open %s" % (mfPathname)) self.notify.warning("Couldn't open %s" % (mfPathname))
return self.stepFailed yield self.stepFailed; return
allExtractsOk = True allExtractsOk = True
step.bytesDone = 0 step.bytesDone = 0
@ -860,15 +911,15 @@ class PackageInfo:
# If the task manager has been destroyed, we must # If the task manager has been destroyed, we must
# be shutting down. Get out of here. # be shutting down. Get out of here.
self.notify.warning("Task Manager destroyed, aborting unpacking %s" % (mfPathname)) self.notify.warning("Task Manager destroyed, aborting unpacking %s" % (mfPathname))
return self.stepFailed yield self.stepFailed; return
Thread.considerYield() yield self.stepContinue
if not allExtractsOk: if not allExtractsOk:
return self.stepFailed yield self.stepFailed; return
self.hasPackage = True self.hasPackage = True
return self.stepComplete yield self.stepComplete; return
def installPackage(self, appRunner): def installPackage(self, appRunner):
""" Mounts the package and sets up system paths so it becomes """ Mounts the package and sets up system paths so it becomes

View File

@ -1,5 +1,5 @@
from direct.showbase.DirectObject import DirectObject from direct.showbase.DirectObject import DirectObject
from direct.stdpy.threading import Lock from direct.stdpy.threading import Lock, RLock
from direct.showbase.MessengerGlobal import messenger from direct.showbase.MessengerGlobal import messenger
from direct.task.TaskManagerGlobal import taskMgr from direct.task.TaskManagerGlobal import taskMgr
from direct.p3d.PackageInfo import PackageInfo from direct.p3d.PackageInfo import PackageInfo
@ -142,7 +142,7 @@ class PackageInstaller(DirectObject):
return True return True
def __init__(self, appRunner, taskChain = 'install'): def __init__(self, appRunner, taskChain = 'default'):
self.globalLock.acquire() self.globalLock.acquire()
try: try:
self.uniqueId = PackageInstaller.nextUniqueId self.uniqueId = PackageInstaller.nextUniqueId
@ -153,9 +153,10 @@ class PackageInstaller(DirectObject):
self.appRunner = appRunner self.appRunner = appRunner
self.taskChain = taskChain self.taskChain = taskChain
# If the task chain hasn't yet been set up, create the # If we're to be running on an asynchronous task chain, and
# the task chain hasn't yet been set up already, create the
# default parameters now. # default parameters now.
if not taskMgr.hasTaskChain(self.taskChain): if taskChain != 'default' and not taskMgr.hasTaskChain(self.taskChain):
taskMgr.setupTaskChain(self.taskChain, numThreads = 1, taskMgr.setupTaskChain(self.taskChain, numThreads = 1,
threadPriority = TPLow) threadPriority = TPLow)
@ -165,7 +166,7 @@ class PackageInstaller(DirectObject):
# A list of all packages that have been added to the # A list of all packages that have been added to the
# installer. # installer.
self.packageLock = Lock() self.packageLock = RLock()
self.packages = [] self.packages = []
self.state = self.S_initial self.state = self.S_initial
@ -528,17 +529,20 @@ class PackageInstaller(DirectObject):
it extracts one package from self.needsDownload and downloads it extracts one package from self.needsDownload and downloads
it. """ it. """
while True:
self.packageLock.acquire() self.packageLock.acquire()
try: try:
# If we're done downloading, stop the task. # If we're done downloading, stop the task.
if self.state == self.S_done or not self.needsDownload: if self.state == self.S_done or not self.needsDownload:
self.downloadTask = None self.downloadTask = None
return task.done self.packageLock.release()
yield task.done; return
assert self.state == self.S_started assert self.state == self.S_started
pp = self.needsDownload[0] pp = self.needsDownload[0]
del self.needsDownload[0] del self.needsDownload[0]
finally: except:
self.packageLock.release()
self.packageLock.release() self.packageLock.release()
# Now serve this one package. # Now serve this one package.
@ -546,14 +550,21 @@ class PackageInstaller(DirectObject):
[pp], taskChain = 'default') [pp], taskChain = 'default')
if not pp.package.hasPackage: if not pp.package.hasPackage:
if not pp.package.downloadPackage(self.appRunner.http): for token in pp.package.downloadPackageGenerator(self.appRunner.http):
if token == pp.package.stepContinue:
yield task.cont
else:
break
if token != pp.package.stepComplete:
self.__donePackage(pp, False) self.__donePackage(pp, False)
return task.cont yield task.cont
continue
# Successfully downloaded and installed. # Successfully downloaded and installed.
self.__donePackage(pp, True) self.__donePackage(pp, True)
return task.cont yield task.cont
def __donePackage(self, pp, success): def __donePackage(self, pp, success):
""" Marks the indicated package as done, either successfully """ Marks the indicated package as done, either successfully