split out FindContainers job

This commit is contained in:
Darren Ranalli 2007-04-10 08:34:50 +00:00
parent c5e0e5907a
commit 8464d45d9f

View File

@ -25,160 +25,6 @@ def _createContainerLeak():
return task.cont
task = taskMgr.add(leakContainer, 'leakContainer-%s' % serialNum())
class CheckContainers(Job):
"""
Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector
"""
ReprItems = 5
def __init__(self, name, leakDetector, index):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
self._index = index
ContainerLeakDetector.addPrivateId(self.__dict__)
def destroy(self):
ContainerLeakDetector.removePrivateId(self.__dict__)
Job.destroy(self)
def getPriority(self):
return Job.Priorities.Normal
def run(self):
self._leakDetector._index2containerId2len[self._index] = {}
#self._leakDetector.notify.debug(repr(self._leakDetector._id2ref))
ids = self._leakDetector.getContainerIds()
# record the current len of each container
for id in ids:
yield None
try:
for result in self._leakDetector.getContainerByIdGen(id):
yield None
container = result
except Exception, e:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(id):
yield None
self.notify.debug(
'%s no longer exists; caught exception in getContainerById (%s)' % (
contName, e))
self._leakDetector.removeContainerById(id)
continue
if container is None:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(id):
yield None
self.notify.debug('%s no longer exists; getContainerById returned None' %
contName)
self._leakDetector.removeContainerById(id)
continue
try:
cLen = len(container)
except Exception, e:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(id):
yield None
self.notify.debug(
'%s is no longer a container, it is now %s (%s)' %
(contName, safeRepr(container), e))
self._leakDetector.removeContainerById(id)
continue
self._leakDetector._index2containerId2len[self._index][id] = cLen
# compare the current len of each container to past lens
if self._index > 0:
idx2id2len = self._leakDetector._index2containerId2len
for id in idx2id2len[self._index]:
yield None
if id in idx2id2len[self._index-1]:
diff = idx2id2len[self._index][id] - idx2id2len[self._index-1][id]
if diff > 0:
if diff > idx2id2len[self._index-1][id]:
minutes = (self._leakDetector._index2delay[self._index] -
self._leakDetector._index2delay[self._index-1]) / 60.
name = self._leakDetector.getContainerNameById(id)
if idx2id2len[self._index-1][id] != 0:
percent = 100. * (float(diff) / float(idx2id2len[self._index-1][id]))
for container in self._leakDetector.getContainerByIdGen(id):
yield None
self.notify.warning(
'%s (%s) grew %.2f%% in %.2f minutes (currently %s items): %s' % (
name, itype(container), percent, minutes, idx2id2len[self._index][id],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
yield None
if (self._index > 2 and
id in idx2id2len[self._index-2] and
id in idx2id2len[self._index-3]):
diff2 = idx2id2len[self._index-1][id] - idx2id2len[self._index-2][id]
diff3 = idx2id2len[self._index-2][id] - idx2id2len[self._index-3][id]
if self._index <= 4:
if diff > 0 and diff2 > 0 and diff3 > 0:
name = self._leakDetector.getContainerNameById(id)
for container in self._leakDetector.getContainerByIdGen(id):
yield None
msg = ('%s (%s) consistently increased in size over the last '
'3 periods (currently %s items): %s' %
(name, itype(container), idx2id2len[self._index][id],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
self.notify.warning(msg)
yield None
elif (id in idx2id2len[self._index-4] and
id in idx2id2len[self._index-5]):
# if size has consistently increased over the last 5 checks,
# send out a warning
diff4 = idx2id2len[self._index-3][id] - idx2id2len[self._index-4][id]
diff5 = idx2id2len[self._index-4][id] - idx2id2len[self._index-5][id]
if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0:
name = self._leakDetector.getContainerNameById(id)
for container in self._leakDetector.getContainerByIdGen(id):
yield None
msg = ('%s (%s) consistently increased in size over the last '
'5 periods (currently %s items): %s' %
(name, itype(container), idx2id2len[self._index][id],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
self.notify.warning(msg)
self.notify.warning('sending notification...')
yield None
for result in self._leakDetector.getContainerByIdGen(id):
yield None
container = result
messenger.send(self._leakDetector.getLeakEvent(), [container, name])
yield Job.Done
class PruneContainerRefs(Job):
"""
Job to destroy any container refs that are no longer valid.
Checks validity by asking for each container
"""
def __init__(self, name, leakDetector):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
ContainerLeakDetector.addPrivateId(self.__dict__)
def destroy(self):
ContainerLeakDetector.removePrivateId(self.__dict__)
Job.destroy(self)
def getPriority(self):
return Job.Priorities.Normal-1
def run(self):
ids = self._leakDetector._id2ref.keys()
for id in ids:
yield None
try:
for result in self._leakDetector.getContainerByIdGen(id):
yield None
container = result
except:
# reference is invalid, remove it
self._leakDetector.removeContainerById(id)
yield Job.Done
class NoDictKey:
pass
@ -382,31 +228,16 @@ class ContainerRef:
pass
return result
class ContainerLeakDetector(Job):
class FindContainers(Job):
"""
Low-priority Python object-graph walker that looks for leaking containers.
To reduce memory usage, this does a random walk of the Python objects to
discover containers rather than keep a set of all visited objects; it may
visit the same object many times but eventually it will discover every object.
Checks container sizes at ever-increasing intervals.
Explore the Python graph, looking for objects that support __len__()
"""
notify = directNotify.newCategory("ContainerLeakDetector")
# set of containers that should not be examined
PrivateIds = set()
def __init__(self, name, firstCheckDelay = None):
def __init__(self, name, leakDetector):
Job.__init__(self, name)
self._serialNum = serialNum()
self._priority = (Job.Priorities.Low + Job.Priorities.Normal) / 2
self._checkContainersJob = None
if firstCheckDelay is None:
firstCheckDelay = 60. * (15./2)
self._nextCheckDelay = firstCheckDelay
self._pruneTaskPeriod = config.GetFloat('leak-detector-prune-period', 60. * 30.)
self._index2containerId2len = {}
self._index2delay = {}
# set up our data structures
self._id2ref = {}
self._leakDetector = leakDetector
self._id2ref = self._leakDetector._id2ref
self.notify = self._leakDetector.notify
ContainerLeakDetector.addPrivateObj(self.__dict__)
# set up the base/starting object
self._baseObjRef = ContainerRef(Indirection(evalStr='__builtin__.__dict__'))
@ -429,71 +260,66 @@ class ContainerLeakDetector(Job):
for i in self._nameContainerGen(simbase.__dict__, self._baseObjRef):
pass
if config.GetBool('leak-container', 0):
_createContainerLeak()
self._curObjRef = self._baseObjRef
jobMgr.add(self)
ContainerLeakDetector.PrivateIds.update(set([
id(ContainerLeakDetector.PrivateIds),
id(self.__dict__),
]))
def destroy(self):
self.ignoreAll()
if self._checkContainersJob is not None:
jobMgr.remove(self._checkContainersJob)
self._checkContainersJob = None
del self._id2ref
del self._index2containerId2len
del self._index2delay
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
def getPriority(self):
return self._priority
@classmethod
def addPrivateId(cls, obj):
cls.PrivateIds.add(id(obj))
@classmethod
def removePrivateId(cls, obj):
cls.PrivateIds.remove(id(obj))
def _getCheckTaskName(self):
return 'checkForLeakingContainers-%s' % self._serialNum
def _getPruneTaskName(self):
return 'pruneLeakingContainerRefs-%s' % self._serialNum
def getLeakEvent(self):
# passes description string as argument
return 'containerLeakDetected-%s' % self._serialNum
def getContainerIds(self):
return self._id2ref.keys()
def getContainerByIdGen(self, id):
# return a generator to look up a container
return self._id2ref[id].getContainer()
def getContainerById(self, id):
for result in self._id2ref[id].getContainer():
return Job.Priorities.Low
def _isDeadEnd(self, obj, objName=None):
if type(obj) in (types.BooleanType, types.BuiltinFunctionType,
types.BuiltinMethodType, types.ComplexType,
types.FloatType, types.IntType, types.LongType,
types.NoneType, types.NotImplementedType,
types.TypeType, types.CodeType, types.FunctionType,
types.StringType, types.UnicodeType,
types.TupleType):
return True
# if it's an internal object, ignore it
if id(obj) in ContainerLeakDetector.PrivateIds:
return True
if objName in ('im_self', 'im_class'):
return True
try:
className = obj.__class__.__name__
except:
pass
return result
def getContainerNameByIdGen(self, id):
return self._id2ref[id].getNameGen()
def getContainerNameById(self, id):
if id in self._id2ref:
return repr(self._id2ref[id])
return '<unknown container>'
def removeContainerById(self, id):
if id in self._id2ref:
self._id2ref[id].destroy()
del self._id2ref[id]
else:
# prevent infinite recursion in built-in containers related to methods
if className == 'method-wrapper':
return True
return False
def _isContainer(self, obj):
try:
len(obj)
except:
return False
return True
def _nameContainerGen(self, cont, objRef):
"""
if self.notify.getDebug():
self.notify.debug('_nameContainer: %s' % objRef)
#printStack()
"""
contId = id(cont)
# if this container is new, or the objRef repr is shorter than what we already have,
# put it in the table
if contId in self._id2ref:
for existingRepr in self._id2ref[contId].getNameGen():
yield None
for newRepr in objRef.getNameGen():
yield None
if contId not in self._id2ref or len(newRepr) < len(existingRepr):
if contId in self._id2ref:
self._leakDetector.removeContainerById(contId)
self._id2ref[contId] = objRef
def run(self):
taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
self._getCheckTaskName())
self._scheduleNextPruning()
while True:
# yield up here instead of at the end, since we skip back to the
# top of the while loop from various points
@ -653,62 +479,278 @@ class ContainerLeakDetector(Job):
del child
continue
class CheckContainers(Job):
"""
Job to check container sizes and find potential leaks; sub-job of ContainerLeakDetector
"""
ReprItems = 5
def __init__(self, name, leakDetector, index):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
self._index = index
ContainerLeakDetector.addPrivateObj(self.__dict__)
def destroy(self):
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
def getPriority(self):
return Job.Priorities.Normal
def run(self):
self._leakDetector._index2containerId2len[self._index] = {}
ids = self._leakDetector.getContainerIds()
# record the current len of each container
for id in ids:
yield None
try:
for result in self._leakDetector.getContainerByIdGen(id):
yield None
container = result
except Exception, e:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(id):
yield None
self.notify.debug(
'%s no longer exists; caught exception in getContainerById (%s)' % (
contName, e))
self._leakDetector.removeContainerById(id)
continue
if container is None:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(id):
yield None
self.notify.debug('%s no longer exists; getContainerById returned None' %
contName)
self._leakDetector.removeContainerById(id)
continue
try:
cLen = len(container)
except Exception, e:
# this container no longer exists
if self.notify.getDebug():
for contName in self._leakDetector.getContainerNameByIdGen(id):
yield None
self.notify.debug(
'%s is no longer a container, it is now %s (%s)' %
(contName, safeRepr(container), e))
self._leakDetector.removeContainerById(id)
continue
self._leakDetector._index2containerId2len[self._index][id] = cLen
# compare the current len of each container to past lens
if self._index > 0:
idx2id2len = self._leakDetector._index2containerId2len
for id in idx2id2len[self._index]:
yield None
if id in idx2id2len[self._index-1]:
diff = idx2id2len[self._index][id] - idx2id2len[self._index-1][id]
if diff > 0:
if diff > idx2id2len[self._index-1][id]:
minutes = (self._leakDetector._index2delay[self._index] -
self._leakDetector._index2delay[self._index-1]) / 60.
name = self._leakDetector.getContainerNameById(id)
if idx2id2len[self._index-1][id] != 0:
percent = 100. * (float(diff) / float(idx2id2len[self._index-1][id]))
for container in self._leakDetector.getContainerByIdGen(id):
yield None
self.notify.warning(
'%s (%s) grew %.2f%% in %.2f minutes (%s items at last measurement, current contents: %s)' % (
name, itype(container), percent, minutes, idx2id2len[self._index][id],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
yield None
if (self._index > 2 and
id in idx2id2len[self._index-2] and
id in idx2id2len[self._index-3]):
diff2 = idx2id2len[self._index-1][id] - idx2id2len[self._index-2][id]
diff3 = idx2id2len[self._index-2][id] - idx2id2len[self._index-3][id]
if self._index <= 4:
if diff > 0 and diff2 > 0 and diff3 > 0:
name = self._leakDetector.getContainerNameById(id)
for container in self._leakDetector.getContainerByIdGen(id):
yield None
msg = ('%s (%s) consistently increased in size over the last '
'3 periods (%s items at last measurement, current contents: %s)' %
(name, itype(container), idx2id2len[self._index][id],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
self.notify.warning(msg)
yield None
elif (id in idx2id2len[self._index-4] and
id in idx2id2len[self._index-5]):
# if size has consistently increased over the last 5 checks,
# send out a warning
diff4 = idx2id2len[self._index-3][id] - idx2id2len[self._index-4][id]
diff5 = idx2id2len[self._index-4][id] - idx2id2len[self._index-5][id]
if diff > 0 and diff2 > 0 and diff3 > 0 and diff4 > 0 and diff5 > 0:
name = self._leakDetector.getContainerNameById(id)
for container in self._leakDetector.getContainerByIdGen(id):
yield None
msg = ('%s (%s) consistently increased in size over the last '
'5 periods (%s items at last measurement, current contents: %s)' %
(name, itype(container), idx2id2len[self._index][id],
fastRepr(container, maxLen=CheckContainers.ReprItems)))
self.notify.warning(msg)
self.notify.warning('sending notification...')
yield None
for result in self._leakDetector.getContainerByIdGen(id):
yield None
container = result
messenger.send(self._leakDetector.getLeakEvent(), [container, name])
yield Job.Done
def _isDeadEnd(self, obj, objName=None):
if type(obj) in (types.BooleanType, types.BuiltinFunctionType,
types.BuiltinMethodType, types.ComplexType,
types.FloatType, types.IntType, types.LongType,
types.NoneType, types.NotImplementedType,
types.TypeType, types.CodeType, types.FunctionType,
types.StringType, types.UnicodeType,
types.TupleType):
return True
# if it's an internal object, ignore it
if id(obj) in ContainerLeakDetector.PrivateIds:
return True
if objName in ('im_self', 'im_class'):
return True
try:
className = obj.__class__.__name__
except:
class PruneContainerRefs(Job):
"""
Job to destroy any container refs that are no longer valid.
Checks validity by asking for each container
"""
def __init__(self, name, leakDetector):
Job.__init__(self, name)
self._leakDetector = leakDetector
self.notify = self._leakDetector.notify
ContainerLeakDetector.addPrivateObj(self.__dict__)
def destroy(self):
ContainerLeakDetector.removePrivateObj(self.__dict__)
Job.destroy(self)
def getPriority(self):
return Job.Priorities.Normal
def run(self):
ids = self._leakDetector.getContainerIds()
for id in ids:
yield None
try:
for result in self._leakDetector.getContainerByIdGen(id):
yield None
container = result
except:
# reference is invalid, remove it
self._leakDetector.removeContainerById(id)
yield Job.Done
class ContainerLeakDetector(Job):
"""
Low-priority Python object-graph walker that looks for leaking containers.
To reduce memory usage, this does a random walk of the Python objects to
discover containers rather than keep a set of all visited objects; it may
visit the same object many times but eventually it will discover every object.
Checks container sizes at ever-increasing intervals.
"""
notify = directNotify.newCategory("ContainerLeakDetector")
# set of containers that should not be examined
PrivateIds = set()
def __init__(self, name, firstCheckDelay = None):
Job.__init__(self, name)
self._serialNum = serialNum()
self._findContainersJob = None
self._checkContainersJob = None
self._pruneContainersJob = None
if firstCheckDelay is None:
firstCheckDelay = 60. * (15./2)
self._nextCheckDelay = firstCheckDelay
self._pruneTaskPeriod = config.GetFloat('leak-detector-prune-period', 60. * 30.)
# main dict of id(container)->containerRef
self._id2ref = {}
# storage for results of check-container job
self._index2containerId2len = {}
self._index2delay = {}
if config.GetBool('leak-container', 0):
_createContainerLeak()
# don't check our own tables for leaks
ContainerLeakDetector.addPrivateObj(ContainerLeakDetector.PrivateIds)
ContainerLeakDetector.addPrivateObj(self.__dict__)
jobMgr.add(self)
def destroy(self):
self.ignoreAll()
if self._pruneContainersJob is not None:
jobMgr.remove(self._pruneContainersJob)
self._pruneContainersJob = None
if self._checkContainersJob is not None:
jobMgr.remove(self._checkContainersJob)
self._checkContainersJob = None
jobMgr.remove(self._findContainersJob)
self._findContainersJob = None
del self._id2ref
del self._index2containerId2len
del self._index2delay
def getLeakEvent(self):
# passes description string as argument
return 'containerLeakDetected-%s' % self._serialNum
def getPriority(self):
return Job.Priorities.Min
@classmethod
def addPrivateObj(cls, obj):
cls.PrivateIds.add(id(obj))
@classmethod
def removePrivateObj(cls, obj):
cls.PrivateIds.remove(id(obj))
def _getCheckTaskName(self):
return 'checkForLeakingContainers-%s' % self._serialNum
def _getPruneTaskName(self):
return 'pruneLeakingContainerRefs-%s' % self._serialNum
def getContainerIds(self):
return self._id2ref.keys()
def getContainerByIdGen(self, id):
# return a generator to look up a container
return self._id2ref[id].getContainer()
def getContainerById(self, id):
for result in self._id2ref[id].getContainer():
pass
else:
# prevent infinite recursion in built-in containers related to methods
if className == 'method-wrapper':
return True
return False
return result
def getContainerNameByIdGen(self, id):
return self._id2ref[id].getNameGen()
def getContainerNameById(self, id):
if id in self._id2ref:
return repr(self._id2ref[id])
return '<unknown container>'
def removeContainerById(self, id):
if id in self._id2ref:
self._id2ref[id].destroy()
del self._id2ref[id]
def _isContainer(self, obj):
try:
len(obj)
except:
return False
return True
def run(self):
# start looking for containers
self._findContainersJob = FindContainers(
'%s-findContainers' % self.getJobName(), self)
jobMgr.add(self._findContainersJob)
def _nameContainerGen(self, cont, objRef):
"""
if self.notify.getDebug():
self.notify.debug('_nameContainer: %s' % objRef)
#printStack()
"""
contId = id(cont)
# if this container is new, or the objRef repr is shorter than what we already have,
# put it in the table
if contId in self._id2ref:
for existingRepr in self._id2ref[contId].getNameGen():
yield None
for newRepr in objRef.getNameGen():
yield None
if contId not in self._id2ref or len(newRepr) < len(existingRepr):
if contId in self._id2ref:
self.removeContainerById(contId)
self._id2ref[contId] = objRef
self._scheduleNextLeakCheck()
self._scheduleNextPruning()
while True:
yield Job.Sleep
def _scheduleNextLeakCheck(self):
taskMgr.doMethodLater(self._nextCheckDelay, self._checkForLeaks,
self._getCheckTaskName())
self._nextCheckDelay *= 2
# delay between checks
# fib: 1 1 2 3 5 8 13 21 34 55 89
# * 2.: 1 2 4 8 16 32 64 128 256 512 1024
# * 1.5: 1 1.5 2.3 3.4 5.1 7.6 11.4 17.1 25.6 38.4 57.7
#
# delay from job start
# fib: 1 2 4 7 12 20 33 54 88 143 232
# * 2.: 1 3 7 15 31 63 127 255 511 1023 2047
# * 1.5: 1 2.5 4.75 8.1 13.2 20.8 32.2 49.3 74.9 113.3 171
self._nextCheckDelay = self._nextCheckDelay * 1.5
def _checkForLeaks(self, task=None):
self._index2delay[len(self._index2containerId2len)] = self._nextCheckDelay