mirror of
https://github.com/panda3d/panda3d.git
synced 2025-10-03 02:15:43 -04:00
thread-safe messenger
This commit is contained in:
parent
c3d1efc07d
commit
993fb111de
@ -5,6 +5,7 @@ __all__ = ['Messenger']
|
|||||||
|
|
||||||
from PythonUtil import *
|
from PythonUtil import *
|
||||||
from direct.directnotify import DirectNotifyGlobal
|
from direct.directnotify import DirectNotifyGlobal
|
||||||
|
from direct.stdpy.threading import Lock
|
||||||
import types
|
import types
|
||||||
|
|
||||||
class Messenger:
|
class Messenger:
|
||||||
@ -34,6 +35,14 @@ class Messenger:
|
|||||||
self._messengerIdGen = 0
|
self._messengerIdGen = 0
|
||||||
self._id2object = {}
|
self._id2object = {}
|
||||||
|
|
||||||
|
# A mapping of taskChain -> eventList, used for sending events
|
||||||
|
# across task chains (and therefore across threads).
|
||||||
|
self._eventQueuesByTaskChain = {}
|
||||||
|
|
||||||
|
# This protects the data structures within this object from
|
||||||
|
# multithreaded access.
|
||||||
|
self.lock = Lock()
|
||||||
|
|
||||||
if __debug__:
|
if __debug__:
|
||||||
self.__isWatching=0
|
self.__isWatching=0
|
||||||
self.__watching={}
|
self.__watching={}
|
||||||
@ -58,7 +67,7 @@ class Messenger:
|
|||||||
|
|
||||||
def _storeObject(self, object):
|
def _storeObject(self, object):
|
||||||
# store reference-counted reference to object in case we need to
|
# store reference-counted reference to object in case we need to
|
||||||
# retrieve it later
|
# retrieve it later. assumes lock is held.
|
||||||
id = self._getMessengerId(object)
|
id = self._getMessengerId(object)
|
||||||
if id not in self._id2object:
|
if id not in self._id2object:
|
||||||
self._id2object[id] = [1, object]
|
self._id2object[id] = [1, object]
|
||||||
@ -69,6 +78,7 @@ class Messenger:
|
|||||||
return self._id2object[id][1]
|
return self._id2object[id][1]
|
||||||
|
|
||||||
def _releaseObject(self, object):
|
def _releaseObject(self, object):
|
||||||
|
# assumes lock is held.
|
||||||
id = self._getMessengerId(object)
|
id = self._getMessengerId(object)
|
||||||
if id in self._id2object:
|
if id in self._id2object:
|
||||||
record = self._id2object[id]
|
record = self._id2object[id]
|
||||||
@ -97,30 +107,34 @@ class Messenger:
|
|||||||
"method not callable in accept (ignoring): %s %s"%
|
"method not callable in accept (ignoring): %s %s"%
|
||||||
(method, extraArgs))
|
(method, extraArgs))
|
||||||
|
|
||||||
acceptorDict = self.__callbacks.setdefault(event, {})
|
self.lock.acquire()
|
||||||
|
try:
|
||||||
|
acceptorDict = self.__callbacks.setdefault(event, {})
|
||||||
|
|
||||||
id = self._getMessengerId(object)
|
id = self._getMessengerId(object)
|
||||||
|
|
||||||
# Make sure we are not inadvertently overwriting an existing event
|
# Make sure we are not inadvertently overwriting an existing event
|
||||||
# on this particular object.
|
# on this particular object.
|
||||||
if notifyDebug:
|
if notifyDebug:
|
||||||
if acceptorDict.has_key(id):
|
if acceptorDict.has_key(id):
|
||||||
oldMethod = acceptorDict[id][0]
|
oldMethod = acceptorDict[id][0]
|
||||||
if oldMethod == method:
|
if oldMethod == method:
|
||||||
self.notify.warning(
|
self.notify.warning(
|
||||||
"object: %s was already accepting: \"%s\" with same callback: %s()" %
|
"object: %s was already accepting: \"%s\" with same callback: %s()" %
|
||||||
(object.__class__.__name__, event, method.__name__))
|
(object.__class__.__name__, event, method.__name__))
|
||||||
else:
|
else:
|
||||||
self.notify.warning(
|
self.notify.warning(
|
||||||
"object: %s accept: \"%s\" new callback: %s() supplanting old callback: %s()" %
|
"object: %s accept: \"%s\" new callback: %s() supplanting old callback: %s()" %
|
||||||
(object.__class__.__name__, event, method.__name__, oldMethod.__name__))
|
(object.__class__.__name__, event, method.__name__, oldMethod.__name__))
|
||||||
|
|
||||||
acceptorDict[id] = [method, extraArgs, persistent]
|
acceptorDict[id] = [method, extraArgs, persistent]
|
||||||
|
|
||||||
# Remember that this object is listening for this event
|
# Remember that this object is listening for this event
|
||||||
eventDict = self.__objectEvents.setdefault(id, {})
|
eventDict = self.__objectEvents.setdefault(id, {})
|
||||||
eventDict.setdefault(event, None)
|
eventDict.setdefault(event, None)
|
||||||
self._storeObject(object)
|
self._storeObject(object)
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
def ignore(self, event, object):
|
def ignore(self, event, object):
|
||||||
""" ignore(self, string, DirectObject)
|
""" ignore(self, string, DirectObject)
|
||||||
@ -130,26 +144,30 @@ class Messenger:
|
|||||||
if Messenger.notify.getDebug():
|
if Messenger.notify.getDebug():
|
||||||
Messenger.notify.debug(`object` + '\n now ignoring: ' + `event`)
|
Messenger.notify.debug(`object` + '\n now ignoring: ' + `event`)
|
||||||
|
|
||||||
id = self._getMessengerId(object)
|
self.lock.acquire()
|
||||||
|
try:
|
||||||
|
id = self._getMessengerId(object)
|
||||||
|
|
||||||
# Find the dictionary of all the objects accepting this event
|
# Find the dictionary of all the objects accepting this event
|
||||||
acceptorDict = self.__callbacks.get(event)
|
acceptorDict = self.__callbacks.get(event)
|
||||||
# If this object is there, delete it from the dictionary
|
# If this object is there, delete it from the dictionary
|
||||||
if acceptorDict and acceptorDict.has_key(id):
|
if acceptorDict and acceptorDict.has_key(id):
|
||||||
del acceptorDict[id]
|
del acceptorDict[id]
|
||||||
# If this dictionary is now empty, remove the event
|
# If this dictionary is now empty, remove the event
|
||||||
# entry from the Messenger alltogether
|
# entry from the Messenger alltogether
|
||||||
if (len(acceptorDict) == 0):
|
if (len(acceptorDict) == 0):
|
||||||
del self.__callbacks[event]
|
del self.__callbacks[event]
|
||||||
|
|
||||||
# This object is no longer listening for this event
|
# This object is no longer listening for this event
|
||||||
eventDict = self.__objectEvents.get(id)
|
eventDict = self.__objectEvents.get(id)
|
||||||
if eventDict and eventDict.has_key(event):
|
if eventDict and eventDict.has_key(event):
|
||||||
del eventDict[event]
|
del eventDict[event]
|
||||||
if (len(eventDict) == 0):
|
if (len(eventDict) == 0):
|
||||||
del self.__objectEvents[id]
|
del self.__objectEvents[id]
|
||||||
|
|
||||||
self._releaseObject(object)
|
self._releaseObject(object)
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
def ignoreAll(self, object):
|
def ignoreAll(self, object):
|
||||||
"""
|
"""
|
||||||
@ -158,48 +176,61 @@ class Messenger:
|
|||||||
"""
|
"""
|
||||||
if Messenger.notify.getDebug():
|
if Messenger.notify.getDebug():
|
||||||
Messenger.notify.debug(`object` + '\n now ignoring all events')
|
Messenger.notify.debug(`object` + '\n now ignoring all events')
|
||||||
id = self._getMessengerId(object)
|
|
||||||
# Get the list of events this object is listening to
|
self.lock.acquire()
|
||||||
eventDict = self.__objectEvents.get(id)
|
try:
|
||||||
if eventDict:
|
id = self._getMessengerId(object)
|
||||||
for event in eventDict.keys():
|
# Get the list of events this object is listening to
|
||||||
# Find the dictionary of all the objects accepting this event
|
eventDict = self.__objectEvents.get(id)
|
||||||
acceptorDict = self.__callbacks.get(event)
|
if eventDict:
|
||||||
# If this object is there, delete it from the dictionary
|
for event in eventDict.keys():
|
||||||
if acceptorDict and acceptorDict.has_key(id):
|
# Find the dictionary of all the objects accepting this event
|
||||||
del acceptorDict[id]
|
acceptorDict = self.__callbacks.get(event)
|
||||||
# If this dictionary is now empty, remove the event
|
# If this object is there, delete it from the dictionary
|
||||||
# entry from the Messenger alltogether
|
if acceptorDict and acceptorDict.has_key(id):
|
||||||
if (len(acceptorDict) == 0):
|
del acceptorDict[id]
|
||||||
del self.__callbacks[event]
|
# If this dictionary is now empty, remove the event
|
||||||
del self.__objectEvents[id]
|
# entry from the Messenger alltogether
|
||||||
if id in self._id2object:
|
if (len(acceptorDict) == 0):
|
||||||
del self._id2object[id]
|
del self.__callbacks[event]
|
||||||
|
del self.__objectEvents[id]
|
||||||
|
if id in self._id2object:
|
||||||
|
del self._id2object[id]
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
def getAllAccepting(self, object):
|
def getAllAccepting(self, object):
|
||||||
"""
|
"""
|
||||||
Returns the list of all events accepted by the indicated object.
|
Returns the list of all events accepted by the indicated object.
|
||||||
"""
|
"""
|
||||||
id = self._getMessengerId(object)
|
self.lock.acquire()
|
||||||
|
try:
|
||||||
|
id = self._getMessengerId(object)
|
||||||
|
|
||||||
# Get the list of events this object is listening to
|
# Get the list of events this object is listening to
|
||||||
eventDict = self.__objectEvents.get(id)
|
eventDict = self.__objectEvents.get(id)
|
||||||
if eventDict:
|
if eventDict:
|
||||||
return eventDict.keys()
|
return eventDict.keys()
|
||||||
return []
|
return []
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
def isAccepting(self, event, object):
|
def isAccepting(self, event, object):
|
||||||
""" isAccepting(self, string, DirectOject)
|
""" isAccepting(self, string, DirectOject)
|
||||||
Is this object accepting this event?
|
Is this object accepting this event?
|
||||||
"""
|
"""
|
||||||
acceptorDict = self.__callbacks.get(event)
|
self.lock.acquire()
|
||||||
id = self._getMessengerId(object)
|
try:
|
||||||
if acceptorDict and acceptorDict.has_key(id):
|
acceptorDict = self.__callbacks.get(event)
|
||||||
# Found it, return true
|
id = self._getMessengerId(object)
|
||||||
return 1
|
if acceptorDict and acceptorDict.has_key(id):
|
||||||
# If we looked in both dictionaries and made it here
|
# Found it, return true
|
||||||
# that object must not be accepting that event.
|
return 1
|
||||||
return 0
|
# If we looked in both dictionaries and made it here
|
||||||
|
# that object must not be accepting that event.
|
||||||
|
return 0
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
def whoAccepts(self, event):
|
def whoAccepts(self, event):
|
||||||
"""
|
"""
|
||||||
@ -213,30 +244,59 @@ class Messenger:
|
|||||||
"""
|
"""
|
||||||
return (not self.isAccepting(event, object))
|
return (not self.isAccepting(event, object))
|
||||||
|
|
||||||
def send(self, event, sentArgs=[]):
|
def send(self, event, sentArgs=[], taskChain = None):
|
||||||
"""
|
"""
|
||||||
|
Send this event, optionally passing in arguments
|
||||||
|
|
||||||
event is usually a string.
|
event is usually a string.
|
||||||
sentArgs is a list of any data that you want passed along to the
|
sentArgs is a list of any data that you want passed along to the
|
||||||
handlers listening to this event.
|
handlers listening to this event.
|
||||||
|
|
||||||
Send this event, optionally passing in arguments
|
If taskChain is not None, it is the name of the task chain
|
||||||
|
which should receive the event. If taskChain is None, the
|
||||||
|
event is handled immediately. Setting a non-None taskChain
|
||||||
|
will defer the event (possibly till next frame or even later)
|
||||||
|
and create a new, temporary task within the named taskChain,
|
||||||
|
but this is the only way to send an event across threads.
|
||||||
"""
|
"""
|
||||||
if Messenger.notify.getDebug() and not self.quieting.get(event):
|
if Messenger.notify.getDebug() and not self.quieting.get(event):
|
||||||
assert Messenger.notify.debug(
|
assert Messenger.notify.debug(
|
||||||
'sent event: ' + event + ' sentArgs: ' + `sentArgs`)
|
'sent event: %s sentArgs = %s, taskChain = %s' % (
|
||||||
if __debug__:
|
event, sentArgs, taskChain))
|
||||||
foundWatch=0
|
|
||||||
if self.__isWatching:
|
self.lock.acquire()
|
||||||
for i in self.__watching.keys():
|
try:
|
||||||
if str(event).find(i) >= 0:
|
|
||||||
foundWatch=1
|
|
||||||
break
|
|
||||||
acceptorDict = self.__callbacks.get(event)
|
|
||||||
if not acceptorDict:
|
|
||||||
if __debug__:
|
if __debug__:
|
||||||
if foundWatch:
|
foundWatch=0
|
||||||
print "Messenger: \"%s\" was sent, but no function in Python listened."%(event,)
|
if self.__isWatching:
|
||||||
return
|
for i in self.__watching.keys():
|
||||||
|
if str(event).find(i) >= 0:
|
||||||
|
foundWatch=1
|
||||||
|
break
|
||||||
|
acceptorDict = self.__callbacks.get(event)
|
||||||
|
if not acceptorDict:
|
||||||
|
if __debug__:
|
||||||
|
if foundWatch:
|
||||||
|
print "Messenger: \"%s\" was sent, but no function in Python listened."%(event,)
|
||||||
|
return
|
||||||
|
|
||||||
|
if taskChain:
|
||||||
|
# Queue the event onto the indicated task chain.
|
||||||
|
taskMgr.add(self.__lockAndDispatch, name = 'Messenger-%s-%s' % (event, taskChain), extraArgs = [acceptorDict, event, sentArgs, foundWatch], taskChain = taskChain)
|
||||||
|
else:
|
||||||
|
# Handle the event immediately.
|
||||||
|
self.__dispatch(acceptorDict, event, sentArgs, foundWatch)
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
|
def __lockAndDispatch(self, acceptorDict, event, sentArgs, foundWatch):
|
||||||
|
self.lock.acquire()
|
||||||
|
try:
|
||||||
|
self.__dispatch(acceptorDict, event, sentArgs, foundWatch)
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
|
def __dispatch(self, acceptorDict, event, sentArgs, foundWatch):
|
||||||
for id in acceptorDict.keys():
|
for id in acceptorDict.keys():
|
||||||
# We have to make this apparently redundant check, because
|
# We have to make this apparently redundant check, because
|
||||||
# it is possible that one object removes its own hooks
|
# it is possible that one object removes its own hooks
|
||||||
@ -283,14 +343,24 @@ class Messenger:
|
|||||||
# method itself might call accept() or acceptOnce()
|
# method itself might call accept() or acceptOnce()
|
||||||
# again.
|
# again.
|
||||||
assert callable(method)
|
assert callable(method)
|
||||||
method (*(extraArgs + sentArgs))
|
|
||||||
|
# Release the lock temporarily while we call the method.
|
||||||
|
self.lock.release()
|
||||||
|
try:
|
||||||
|
method (*(extraArgs + sentArgs))
|
||||||
|
finally:
|
||||||
|
self.lock.acquire()
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
"""
|
"""
|
||||||
Start fresh with a clear dict
|
Start fresh with a clear dict
|
||||||
"""
|
"""
|
||||||
self.__callbacks.clear()
|
self.lock.acquire()
|
||||||
self.__objectEvents.clear()
|
try:
|
||||||
|
self.__callbacks.clear()
|
||||||
|
self.__objectEvents.clear()
|
||||||
|
finally:
|
||||||
|
self.lock.release()
|
||||||
|
|
||||||
def isEmpty(self):
|
def isEmpty(self):
|
||||||
return (len(self.__callbacks) == 0)
|
return (len(self.__callbacks) == 0)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user