diff --git a/direct/src/showbase/Messenger.py b/direct/src/showbase/Messenger.py index 8b5abcf32e..e5740d0d89 100644 --- a/direct/src/showbase/Messenger.py +++ b/direct/src/showbase/Messenger.py @@ -5,6 +5,7 @@ __all__ = ['Messenger'] from PythonUtil import * from direct.directnotify import DirectNotifyGlobal +from direct.stdpy.threading import Lock import types class Messenger: @@ -34,6 +35,14 @@ class Messenger: self._messengerIdGen = 0 self._id2object = {} + # A mapping of taskChain -> eventList, used for sending events + # across task chains (and therefore across threads). + self._eventQueuesByTaskChain = {} + + # This protects the data structures within this object from + # multithreaded access. + self.lock = Lock() + if __debug__: self.__isWatching=0 self.__watching={} @@ -58,7 +67,7 @@ class Messenger: def _storeObject(self, object): # store reference-counted reference to object in case we need to - # retrieve it later + # retrieve it later. assumes lock is held. id = self._getMessengerId(object) if id not in self._id2object: self._id2object[id] = [1, object] @@ -69,6 +78,7 @@ class Messenger: return self._id2object[id][1] def _releaseObject(self, object): + # assumes lock is held. id = self._getMessengerId(object) if id in self._id2object: record = self._id2object[id] @@ -97,30 +107,34 @@ class Messenger: "method not callable in accept (ignoring): %s %s"% (method, extraArgs)) - acceptorDict = self.__callbacks.setdefault(event, {}) + self.lock.acquire() + try: + acceptorDict = self.__callbacks.setdefault(event, {}) - id = self._getMessengerId(object) + id = self._getMessengerId(object) - # Make sure we are not inadvertently overwriting an existing event - # on this particular object. - if notifyDebug: - if acceptorDict.has_key(id): - oldMethod = acceptorDict[id][0] - if oldMethod == method: - self.notify.warning( - "object: %s was already accepting: \"%s\" with same callback: %s()" % - (object.__class__.__name__, event, method.__name__)) - else: - self.notify.warning( - "object: %s accept: \"%s\" new callback: %s() supplanting old callback: %s()" % - (object.__class__.__name__, event, method.__name__, oldMethod.__name__)) + # Make sure we are not inadvertently overwriting an existing event + # on this particular object. + if notifyDebug: + if acceptorDict.has_key(id): + oldMethod = acceptorDict[id][0] + if oldMethod == method: + self.notify.warning( + "object: %s was already accepting: \"%s\" with same callback: %s()" % + (object.__class__.__name__, event, method.__name__)) + else: + self.notify.warning( + "object: %s accept: \"%s\" new callback: %s() supplanting old callback: %s()" % + (object.__class__.__name__, event, method.__name__, oldMethod.__name__)) - acceptorDict[id] = [method, extraArgs, persistent] + acceptorDict[id] = [method, extraArgs, persistent] - # Remember that this object is listening for this event - eventDict = self.__objectEvents.setdefault(id, {}) - eventDict.setdefault(event, None) - self._storeObject(object) + # Remember that this object is listening for this event + eventDict = self.__objectEvents.setdefault(id, {}) + eventDict.setdefault(event, None) + self._storeObject(object) + finally: + self.lock.release() def ignore(self, event, object): """ ignore(self, string, DirectObject) @@ -130,26 +144,30 @@ class Messenger: if Messenger.notify.getDebug(): Messenger.notify.debug(`object` + '\n now ignoring: ' + `event`) - id = self._getMessengerId(object) + self.lock.acquire() + try: + id = self._getMessengerId(object) - # Find the dictionary of all the objects accepting this event - acceptorDict = self.__callbacks.get(event) - # If this object is there, delete it from the dictionary - if acceptorDict and acceptorDict.has_key(id): - del acceptorDict[id] - # If this dictionary is now empty, remove the event - # entry from the Messenger alltogether - if (len(acceptorDict) == 0): - del self.__callbacks[event] + # Find the dictionary of all the objects accepting this event + acceptorDict = self.__callbacks.get(event) + # If this object is there, delete it from the dictionary + if acceptorDict and acceptorDict.has_key(id): + del acceptorDict[id] + # If this dictionary is now empty, remove the event + # entry from the Messenger alltogether + if (len(acceptorDict) == 0): + del self.__callbacks[event] - # This object is no longer listening for this event - eventDict = self.__objectEvents.get(id) - if eventDict and eventDict.has_key(event): - del eventDict[event] - if (len(eventDict) == 0): - del self.__objectEvents[id] + # This object is no longer listening for this event + eventDict = self.__objectEvents.get(id) + if eventDict and eventDict.has_key(event): + del eventDict[event] + if (len(eventDict) == 0): + del self.__objectEvents[id] - self._releaseObject(object) + self._releaseObject(object) + finally: + self.lock.release() def ignoreAll(self, object): """ @@ -158,48 +176,61 @@ class Messenger: """ if Messenger.notify.getDebug(): Messenger.notify.debug(`object` + '\n now ignoring all events') - id = self._getMessengerId(object) - # Get the list of events this object is listening to - eventDict = self.__objectEvents.get(id) - if eventDict: - for event in eventDict.keys(): - # Find the dictionary of all the objects accepting this event - acceptorDict = self.__callbacks.get(event) - # If this object is there, delete it from the dictionary - if acceptorDict and acceptorDict.has_key(id): - del acceptorDict[id] - # If this dictionary is now empty, remove the event - # entry from the Messenger alltogether - if (len(acceptorDict) == 0): - del self.__callbacks[event] - del self.__objectEvents[id] - if id in self._id2object: - del self._id2object[id] + + self.lock.acquire() + try: + id = self._getMessengerId(object) + # Get the list of events this object is listening to + eventDict = self.__objectEvents.get(id) + if eventDict: + for event in eventDict.keys(): + # Find the dictionary of all the objects accepting this event + acceptorDict = self.__callbacks.get(event) + # If this object is there, delete it from the dictionary + if acceptorDict and acceptorDict.has_key(id): + del acceptorDict[id] + # If this dictionary is now empty, remove the event + # entry from the Messenger alltogether + if (len(acceptorDict) == 0): + del self.__callbacks[event] + del self.__objectEvents[id] + if id in self._id2object: + del self._id2object[id] + finally: + self.lock.release() def getAllAccepting(self, object): """ Returns the list of all events accepted by the indicated object. """ - id = self._getMessengerId(object) + self.lock.acquire() + try: + id = self._getMessengerId(object) - # Get the list of events this object is listening to - eventDict = self.__objectEvents.get(id) - if eventDict: - return eventDict.keys() - return [] + # Get the list of events this object is listening to + eventDict = self.__objectEvents.get(id) + if eventDict: + return eventDict.keys() + return [] + finally: + self.lock.release() def isAccepting(self, event, object): """ isAccepting(self, string, DirectOject) Is this object accepting this event? """ - acceptorDict = self.__callbacks.get(event) - id = self._getMessengerId(object) - if acceptorDict and acceptorDict.has_key(id): - # Found it, return true - return 1 - # If we looked in both dictionaries and made it here - # that object must not be accepting that event. - return 0 + self.lock.acquire() + try: + acceptorDict = self.__callbacks.get(event) + id = self._getMessengerId(object) + if acceptorDict and acceptorDict.has_key(id): + # Found it, return true + return 1 + # If we looked in both dictionaries and made it here + # that object must not be accepting that event. + return 0 + finally: + self.lock.release() def whoAccepts(self, event): """ @@ -213,30 +244,59 @@ class Messenger: """ return (not self.isAccepting(event, object)) - def send(self, event, sentArgs=[]): + def send(self, event, sentArgs=[], taskChain = None): """ + Send this event, optionally passing in arguments + event is usually a string. sentArgs is a list of any data that you want passed along to the handlers listening to this event. - Send this event, optionally passing in arguments + If taskChain is not None, it is the name of the task chain + which should receive the event. If taskChain is None, the + event is handled immediately. Setting a non-None taskChain + will defer the event (possibly till next frame or even later) + and create a new, temporary task within the named taskChain, + but this is the only way to send an event across threads. """ if Messenger.notify.getDebug() and not self.quieting.get(event): assert Messenger.notify.debug( - 'sent event: ' + event + ' sentArgs: ' + `sentArgs`) - if __debug__: - foundWatch=0 - if self.__isWatching: - for i in self.__watching.keys(): - if str(event).find(i) >= 0: - foundWatch=1 - break - acceptorDict = self.__callbacks.get(event) - if not acceptorDict: + 'sent event: %s sentArgs = %s, taskChain = %s' % ( + event, sentArgs, taskChain)) + + self.lock.acquire() + try: if __debug__: - if foundWatch: - print "Messenger: \"%s\" was sent, but no function in Python listened."%(event,) - return + foundWatch=0 + if self.__isWatching: + for i in self.__watching.keys(): + if str(event).find(i) >= 0: + foundWatch=1 + break + acceptorDict = self.__callbacks.get(event) + if not acceptorDict: + if __debug__: + if foundWatch: + print "Messenger: \"%s\" was sent, but no function in Python listened."%(event,) + return + + if taskChain: + # Queue the event onto the indicated task chain. + taskMgr.add(self.__lockAndDispatch, name = 'Messenger-%s-%s' % (event, taskChain), extraArgs = [acceptorDict, event, sentArgs, foundWatch], taskChain = taskChain) + else: + # Handle the event immediately. + self.__dispatch(acceptorDict, event, sentArgs, foundWatch) + finally: + self.lock.release() + + def __lockAndDispatch(self, acceptorDict, event, sentArgs, foundWatch): + self.lock.acquire() + try: + self.__dispatch(acceptorDict, event, sentArgs, foundWatch) + finally: + self.lock.release() + + def __dispatch(self, acceptorDict, event, sentArgs, foundWatch): for id in acceptorDict.keys(): # We have to make this apparently redundant check, because # it is possible that one object removes its own hooks @@ -283,14 +343,24 @@ class Messenger: # method itself might call accept() or acceptOnce() # again. assert callable(method) - method (*(extraArgs + sentArgs)) + + # Release the lock temporarily while we call the method. + self.lock.release() + try: + method (*(extraArgs + sentArgs)) + finally: + self.lock.acquire() def clear(self): """ Start fresh with a clear dict """ - self.__callbacks.clear() - self.__objectEvents.clear() + self.lock.acquire() + try: + self.__callbacks.clear() + self.__objectEvents.clear() + finally: + self.lock.release() def isEmpty(self): return (len(self.__callbacks) == 0)