separate out cmu and disney distributed systems better

This commit is contained in:
David Rose 2009-06-08 18:49:09 +00:00
parent f5ba889b71
commit 7b00b88f29
23 changed files with 1259 additions and 552 deletions

View File

@ -949,7 +949,7 @@ ai_format_update_msg_type(const string &field_name, DOID_TYPE do_id,
#ifdef HAVE_PYTHON #ifdef HAVE_PYTHON
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: DCClass::client_format_generate // Function: DCClass::client_format_generate_CMU
// Access: Published // Access: Published
// Description: Generates a datagram containing the message necessary // Description: Generates a datagram containing the message necessary
// to generate a new distributed object from the client. // to generate a new distributed object from the client.
@ -958,21 +958,16 @@ ai_format_update_msg_type(const string &field_name, DOID_TYPE do_id,
// //
// optional_fields is a list of fieldNames to generate // optional_fields is a list of fieldNames to generate
// in addition to the normal required fields. // in addition to the normal required fields.
//
// This method is only called by the CMU implementation.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
Datagram DCClass:: Datagram DCClass::
client_format_generate(PyObject *distobj, DOID_TYPE do_id, client_format_generate_CMU(PyObject *distobj, DOID_TYPE do_id,
ZONEID_TYPE zone_id, PyObject *optional_fields) const { ZONEID_TYPE zone_id,
PyObject *optional_fields) const {
DCPacker packer; DCPacker packer;
//packer.raw_pack_uint8('A'); packer.raw_pack_uint16(CLIENT_OBJECT_GENERATE_CMU);
bool has_optional_fields = (PyObject_IsTrue(optional_fields) != 0);
if (has_optional_fields) {
packer.raw_pack_uint16(CLIENT_CREATE_OBJECT_REQUIRED_OTHER);
} else {
packer.raw_pack_uint16(CLIENT_CREATE_OBJECT_REQUIRED);
}
packer.raw_pack_uint32(zone_id); packer.raw_pack_uint32(zone_id);
packer.raw_pack_uint16(_number); packer.raw_pack_uint16(_number);
@ -992,8 +987,10 @@ client_format_generate(PyObject *distobj, DOID_TYPE do_id,
} }
// Also specify the optional fields. // Also specify the optional fields.
if (has_optional_fields) { int num_optional_fields = 0;
int num_optional_fields = PySequence_Size(optional_fields); if (PyObject_IsTrue(optional_fields)) {
num_optional_fields = PySequence_Size(optional_fields);
}
packer.raw_pack_uint16(num_optional_fields); packer.raw_pack_uint16(num_optional_fields);
for (int i = 0; i < num_optional_fields; i++) { for (int i = 0; i < num_optional_fields; i++) {
@ -1016,7 +1013,6 @@ client_format_generate(PyObject *distobj, DOID_TYPE do_id,
} }
packer.end_pack(); packer.end_pack();
} }
}
return Datagram(packer.get_data(), packer.get_length()); return Datagram(packer.get_data(), packer.get_length());
} }

View File

@ -117,8 +117,8 @@ PUBLISHED:
Datagram ai_format_generate(PyObject *distobj, DOID_TYPE do_id, ZONEID_TYPE parent_id, ZONEID_TYPE zone_id, Datagram ai_format_generate(PyObject *distobj, DOID_TYPE do_id, ZONEID_TYPE parent_id, ZONEID_TYPE zone_id,
CHANNEL_TYPE district_channel_id, CHANNEL_TYPE from_channel_id, CHANNEL_TYPE district_channel_id, CHANNEL_TYPE from_channel_id,
PyObject *optional_fields) const; PyObject *optional_fields) const;
Datagram client_format_generate(PyObject *distobj, DOID_TYPE do_id, ZONEID_TYPE zone_id, Datagram client_format_generate_CMU(PyObject *distobj, DOID_TYPE do_id,
PyObject *optional_fields) const; ZONEID_TYPE zone_id, PyObject *optional_fields) const;
Datagram ai_database_generate_context(unsigned int context_id, DOID_TYPE parent_id, ZONEID_TYPE zone_id, CHANNEL_TYPE owner_channel, Datagram ai_database_generate_context(unsigned int context_id, DOID_TYPE parent_id, ZONEID_TYPE zone_id, CHANNEL_TYPE owner_channel,
CHANNEL_TYPE database_server_id, CHANNEL_TYPE from_channel_id) const; CHANNEL_TYPE database_server_id, CHANNEL_TYPE from_channel_id) const;

View File

@ -30,5 +30,7 @@
#define STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT 2051 #define STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT 2051
#define STATESERVER_BOUNCE_MESSAGE 2086 #define STATESERVER_BOUNCE_MESSAGE 2086
#define CLIENT_OBJECT_GENERATE_CMU 9002
#endif #endif

View File

@ -1,6 +1,12 @@
"""ClientRepository module: contains the ClientRepository class""" """ClientRepository module: contains the ClientRepository class"""
from ClientRepositoryBase import * from ClientRepositoryBase import ClientRepositoryBase
from direct.directnotify import DirectNotifyGlobal
from MsgTypesCMU import *
from PyDatagram import PyDatagram
from PyDatagramIterator import PyDatagramIterator
from pandac.PandaModules import UniqueIdAllocator
import types
class ClientRepository(ClientRepositoryBase): class ClientRepository(ClientRepositoryBase):
""" """
@ -12,117 +18,249 @@ class ClientRepository(ClientRepositoryBase):
""" """
notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository") notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository")
def __init__(self, dcFileNames = None): # This is required by DoCollectionManager, even though it's not
ClientRepositoryBase.__init__(self, dcFileNames = dcFileNames) # used by this implementation.
GameGlobalsId = 0
# The DOID allocator. The CMU LAN server may choose to def __init__(self, dcFileNames = None, dcSuffix = ''):
# send us a block of DOIDs. If it chooses to do so, then we ClientRepositoryBase.__init__(self, dcFileNames = dcFileNames, dcSuffix = dcSuffix)
# may create objects, using those DOIDs. self.setHandleDatagramsInternally(False)
self.DOIDbase = 0
self.DOIDnext = 0
self.DOIDlast = 0
def handleSetDOIDrange(self, di): # The doId allocator. The CMU LAN server may choose to
self.DOIDbase = di.getUint32() # send us a block of doIds. If it chooses to do so, then we
self.DOIDlast = self.DOIDbase + di.getUint32() # may create objects, using those doIds.
self.DOIDnext = self.DOIDbase self.doIdAllocator = None
self.doIdBase = 0
self.doIdLast = 0
# The doIdBase of the client message currently being
# processed.
self.currentSenderId = None
def handleSetDoIdrange(self, di):
self.doIdBase = di.getUint32()
self.doIdLast = self.doIdBase + di.getUint32()
self.doIdAllocator = UniqueIdAllocator(self.doIdBase, self.doIdLast - 1)
# Now that we've got a doId range, we can safely generate new
# distributed objects.
messenger.send('createReady')
def handleRequestGenerates(self, di): def handleRequestGenerates(self, di):
# When new clients join the zone of an object, they need to hear # When new clients join the zone of an object, they need to hear
# about it, so we send out all of our information about objects in # about it, so we send out all of our information about objects in
# that particular zone. # that particular zone.
assert self.DOIDnext < self.DOIDlast
zone = di.getUint32() zone = di.getUint32()
for obj in self.doId2do.values(): for obj in self.doId2do.values():
if obj.zone == zone: if obj.zoneId == zone:
id = obj.doId if (self.isLocalId(obj.doId)):
if (self.isLocalId(id)): self.resendGenerate(obj)
self.send(obj.dclass.clientFormatGenerate(obj, id, zone, []))
def createWithRequired(self, className, zoneId = 0, optionalFields=None): def resendGenerate(self, obj):
if self.DOIDnext >= self.DOIDlast: """ Sends the generate message again for an already-generated
self.notify.error( object, presumably to inform any newly-arrived clients of this
"Cannot allocate a distributed object ID: all IDs used up.") object's current state. """
return None
id = self.DOIDnext # get the list of "ram" fields that aren't
self.DOIDnext = self.DOIDnext + 1 # required. These are fields whose values should
dclass = self.dclassesByName[className] # persist even if they haven't been received
# lately, so we have to re-broadcast these values
# in case the new client hasn't heard their latest
# values.
extraFields = []
for i in range(obj.dclass.getNumInheritedFields()):
field = obj.dclass.getInheritedField(i)
if field.hasKeyword('broadcast') and field.hasKeyword('ram') and not field.hasKeyword('required'):
if field.asMolecularField():
# It's a molecular field; this means
# we have to pack the components.
# Fortunately, we'll find those
# separately through the iteration, so
# we can ignore this field itself.
continue
extraFields.append(field.getName())
datagram = self.formatGenerate(obj, extraFields)
self.send(datagram)
def handleGenerate(self, di):
self.currentSenderId = di.getUint32()
zoneId = di.getUint32()
classId = di.getUint16()
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
distObj = self.doId2do.get(doId)
if distObj and distObj.dclass == dclass:
# We've already got this object. Probably this is just a
# repeat-generate, synthesized for the benefit of someone
# else who just entered the zone. Accept the new updates,
# but don't make a formal generate.
assert(self.notify.debug("performing generate-update for %s %s" % (dclass.getName(), doId)))
dclass.receiveUpdateBroadcastRequired(distObj, di)
dclass.receiveUpdateOther(distObj, di)
return
assert(self.notify.debug("performing generate for %s %s" % (dclass.getName(), doId)))
dclass.startGenerate()
# Create a new distributed object, and put it in the dictionary
distObj = self.generateWithRequiredOtherFields(dclass, doId, di, 0, zoneId)
dclass.stopGenerate()
def allocateDoId(self):
""" Returns a newly-allocated doId. Call freeDoId() when the
object has been deleted. """
return self.doIdAllocator.allocate()
def freeDoId(self, doId):
""" Returns a doId back into the free pool for re-use. """
assert self.isLocalId(doId)
self.doIdAllocator.free(doId)
def createDistributedObject(self, className = None, distObj = None,
zoneId = 0, optionalFields = None):
""" To create a DistributedObject, you must pass in either the
name of the object's class, or an already-created instance of
the class (or both). If you pass in just a class name (to the
className parameter), then a default instance of the object
will be created, with whatever parameters the default
constructor supplies. Alternatively, if you wish to create
some initial values different from the default, you can create
the instance yourself and supply it to the distObj parameter,
then that instance will be used instead. (It should be a
newly-created object, not one that has already been manifested
on the network or previously passed through
createDistributedObject.) In either case, the new
DistributedObject is returned from this method.
This method will issue the appropriate network commands to
make this object appear on all of the other clients.
You should supply an initial zoneId in which to manifest the
object. The fields marked "required" or "ram" will be
broadcast to all of the other clients; if you wish to
broadcast additional field values at this time as well, pass a
list of field names in the optionalFields parameters.
"""
if not className:
if not distObj:
self.notify.error("Must specify either a className or a distObj.")
className = distObj.__class__.__name__
doId = self.allocateDoId()
dclass = self.dclassesByName.get(className)
if not dclass:
self.notify.error("Unknown distributed class: %s" % (distObj.__class__))
classDef = dclass.getClassDef() classDef = dclass.getClassDef()
if classDef == None: if classDef == None:
self.notify.error("Could not create an undefined %s object." % ( self.notify.error("Could not create an undefined %s object." % (
dclass.getName())) dclass.getName()))
obj = classDef(self)
obj.dclass = dclass
obj.zone = zoneId
obj.doId = id
self.doId2do[id] = obj
obj.generateInit()
obj._retrieveCachedData()
obj.generate()
obj.announceGenerate()
datagram = dclass.clientFormatGenerate(obj, id, zoneId, optionalFields)
self.send(datagram)
return obj
def sendDisableMsg(self, doId): if not distObj:
datagram = PyDatagram() distObj = classDef(self)
datagram.addUint16(CLIENT_OBJECT_DISABLE) if not isinstance(distObj, classDef):
datagram.addUint32(doId) self.notify.error("Object %s is not an instance of %s" % (distObj.__class__.__name__, classDef.__name__))
distObj.dclass = dclass
distObj.doId = doId
self.doId2do[doId] = distObj
distObj.generateInit()
distObj._retrieveCachedData()
distObj.generate()
distObj.setLocation(0, zoneId)
distObj.announceGenerate()
datagram = self.formatGenerate(distObj, optionalFields)
self.send(datagram) self.send(datagram)
return distObj
def formatGenerate(self, distObj, extraFields):
""" Returns a datagram formatted for sending the generate message for the indicated object. """
return distObj.dclass.clientFormatGenerateCMU(distObj, distObj.doId, distObj.zoneId, extraFields)
def sendDeleteMsg(self, doId): def sendDeleteMsg(self, doId):
datagram = PyDatagram() datagram = PyDatagram()
datagram.addUint16(CLIENT_OBJECT_DELETE) datagram.addUint16(OBJECT_DELETE_CMU)
datagram.addUint32(doId) datagram.addUint32(doId)
self.send(datagram) self.send(datagram)
def sendRemoveZoneMsg(self, zoneId, visibleZoneList=None): def sendDisconnect(self):
datagram = PyDatagram() if self.isConnected():
datagram.addUint16(CLIENT_REMOVE_ZONE) # Tell the game server that we're going:
datagram.addUint32(zoneId)
# if we have an explicit list of visible zones, add them
if visibleZoneList is not None:
vzl = list(visibleZoneList)
vzl.sort()
assert PythonUtil.uniqueElements(vzl)
for zone in vzl:
datagram.addUint32(zone)
# send the message
self.send(datagram)
def sendUpdateZone(self, obj, zoneId):
id = obj.doId
assert self.isLocalId(id)
self.sendDeleteMsg(id, 1)
obj.zone = zoneId
self.send(obj.dclass.clientFormatGenerate(obj, id, zoneId, []))
def sendSetZoneMsg(self, zoneId, visibleZoneList=None):
datagram = PyDatagram() datagram = PyDatagram()
# Add message type # Add message type
datagram.addUint16(CLIENT_SET_ZONE_CMU) datagram.addUint16(CLIENT_DISCONNECT_CMU)
# Add zone id # Send the message
datagram.addUint32(zoneId) self.send(datagram)
self.notify.info("Sent disconnect message to server")
self.disconnect()
self.stopHeartbeat()
# if we have an explicit list of visible zones, add them def setInterestZones(self, interestZoneIds):
if visibleZoneList is not None: """ Changes the set of zones that this particular client is
vzl = list(visibleZoneList) interested in hearing about. """
vzl.sort()
assert PythonUtil.uniqueElements(vzl) datagram = PyDatagram()
for zone in vzl: # Add message type
datagram.addUint32(zone) datagram.addUint16(CLIENT_SET_INTEREST_CMU)
for zoneId in interestZoneIds:
datagram.addUint32(zoneId)
# send the message # send the message
self.send(datagram) self.send(datagram)
def isLocalId(self, id): def setObjectZone(self, distObj, zoneId):
return ((id >= self.DOIDbase) and (id < self.DOIDlast)) """ Moves the object into the indicated zone. """
distObj.b_setLocation(0, zoneId)
assert distObj.zoneId == zoneId
# Tell all of the clients monitoring the new zone that we've
# arrived.
self.resendGenerate(distObj)
def sendSetLocation(self, doId, parentId, zoneId):
datagram = PyDatagram()
datagram.addUint16(OBJECT_SET_ZONE_CMU)
datagram.addUint32(doId)
datagram.addUint32(zoneId)
self.send(datagram)
def sendHeartbeat(self):
datagram = PyDatagram()
# Add message type
datagram.addUint16(CLIENT_HEARTBEAT_CMU)
# Send it!
self.send(datagram)
self.lastHeartbeat = globalClock.getRealTime()
# This is important enough to consider flushing immediately
# (particularly if we haven't run readerPollTask recently).
self.considerFlush()
def isLocalId(self, doId):
""" Returns true if this doId is one that we're the owner of,
false otherwise. """
return ((doId >= self.doIdBase) and (doId < self.doIdLast))
def haveCreateAuthority(self): def haveCreateAuthority(self):
return (self.DOIDlast > self.DOIDnext) """ Returns true if this client has been assigned a range of
doId's it may use to create objects, false otherwise. """
return (self.doIdLast > self.doIdBase)
def getAvatarIdFromSender(self):
""" Returns the doIdBase of the client that originally sent
the current update message. This is only defined when
processing an update message or a generate message. """
return self.currentSenderId
def handleDatagram(self, di): def handleDatagram(self, di):
if self.notify.getDebug(): if self.notify.getDebug():
@ -130,23 +268,22 @@ class ClientRepository(ClientRepositoryBase):
di.getDatagram().dumpHex(ostream) di.getDatagram().dumpHex(ostream)
msgType = self.getMsgType() msgType = self.getMsgType()
self.currentSenderId = None
# These are the sort of messages we may expect from the public # These are the sort of messages we may expect from the public
# Panda server. # Panda server.
if msgType == CLIENT_SET_DOID_RANGE: if msgType == SET_DOID_RANGE_CMU:
self.handleSetDOIDrange(di) self.handleSetDoIdrange(di)
elif msgType == CLIENT_CREATE_OBJECT_REQUIRED_RESP: elif msgType == OBJECT_GENERATE_CMU:
self.handleGenerateWithRequired(di) self.handleGenerate(di)
elif msgType == CLIENT_CREATE_OBJECT_REQUIRED_OTHER_RESP: elif msgType == OBJECT_UPDATE_FIELD_CMU:
self.handleGenerateWithRequiredOther(di)
elif msgType == CLIENT_OBJECT_UPDATE_FIELD_RESP:
self.handleUpdateField(di) self.handleUpdateField(di)
elif msgType == CLIENT_OBJECT_DELETE_RESP: elif msgType == OBJECT_DISABLE_CMU:
self.handleDelete(di)
elif msgType == CLIENT_OBJECT_DISABLE_RESP:
self.handleDisable(di) self.handleDisable(di)
elif msgType == CLIENT_REQUEST_GENERATES: elif msgType == OBJECT_DELETE_CMU:
self.handleDelete(di)
elif msgType == REQUEST_GENERATES_CMU:
self.handleRequestGenerates(di) self.handleRequestGenerates(di)
else: else:
self.handleMessageType(msgType, di) self.handleMessageType(msgType, di)
@ -156,57 +293,90 @@ class ClientRepository(ClientRepositoryBase):
self.considerHeartbeat() self.considerHeartbeat()
def handleMessageType(self, msgType, di): def handleMessageType(self, msgType, di):
self.notify.error("unrecognized message") self.notify.error("unrecognized message type %s" % (msgType))
def handleGenerateWithRequired(self, di): def handleUpdateField(self, di):
# Get the class Id # The CMU update message starts with an additional field, not
classId = di.getUint16() # present in the Disney update message: the doIdBase of the
# Get the DO Id # original sender. Extract that and call up to the parent.
self.currentSenderId = di.getUint32()
ClientRepositoryBase.handleUpdateField(self, di)
def handleDisable(self, di):
# Receives a list of doIds.
while di.getRemainingSize() > 0:
doId = di.getUint32() doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
dclass.startGenerate()
# Create a new distributed object, and put it in the dictionary
distObj = self.generateWithRequiredFields(dclass, doId, di)
dclass.stopGenerate()
def generateWithRequiredFields(self, dclass, doId, di): # We should never get a disable message for our own object.
assert not self.isLocalId(doId)
self.disableDoId(doId)
def handleDelete(self, di):
# Receives a single doId.
doId = di.getUint32()
self.deleteObject(doId)
def deleteObject(self, doId):
"""
Removes the object from the client's view of the world. This
should normally not be called directly except in the case of
error recovery, since the server will normally be responsible
for deleting and disabling objects as they go out of scope.
After this is called, future updates by server on this object
will be ignored (with a warning message). The object will
become valid again the next time the server sends a generate
message for this doId.
This is not a distributed message and does not delete the
object on the server or on any other client.
"""
if self.doId2do.has_key(doId): if self.doId2do.has_key(doId):
# ...it is in our dictionary. # If it is in the dictionary, remove it.
# Just update it. obj = self.doId2do[doId]
distObj = self.doId2do[doId] # Remove it from the dictionary
assert distObj.dclass == dclass del self.doId2do[doId]
distObj.generate() # Disable, announce, and delete the object itself...
distObj.updateRequiredFields(dclass, di) # unless delayDelete is on...
# updateRequiredFields calls announceGenerate obj.deleteOrDelay()
if self.isLocalId(doId):
self.freeDoId(doId)
elif self.cache.contains(doId): elif self.cache.contains(doId):
# ...it is in the cache. # If it is in the cache, remove it.
# Pull it out of the cache: self.cache.delete(doId)
distObj = self.cache.retrieve(doId) if self.isLocalId(doId):
assert distObj.dclass == dclass self.freeDoId(doId)
# put it in the dictionary:
self.doId2do[doId] = distObj
# and update it.
distObj.generate()
distObj.updateRequiredFields(dclass, di)
# updateRequiredFields calls announceGenerate
else: else:
# ...it is not in the dictionary or the cache. # Otherwise, ignore it
# Construct a new one self.notify.warning(
classDef = dclass.getClassDef() "Asked to delete non-existent DistObj " + str(doId))
if classDef == None:
self.notify.error("Could not create an undefined %s object." % ( def sendUpdate(self, distObj, fieldName, args):
dclass.getName())) """ Sends a normal update for a single field. """
distObj = classDef(self) dg = distObj.dclass.clientFormatUpdate(
distObj.dclass = dclass fieldName, distObj.doId, args)
# Assign it an Id self.send(dg)
distObj.doId = doId
# Put the new do in the dictionary def sendUpdateToChannel(self, distObj, channelId, fieldName, args):
self.doId2do[doId] = distObj
# Update the required fields """ Sends a targeted update of a single field to a particular
distObj.generateInit() # Only called when constructed client. The top 32 bits of channelId is ignored; the lower 32
distObj._retrieveCachedData() bits should be the client Id of the recipient (i.e. the
distObj.generate() client's doIdbase). The field update will be sent to the
distObj.updateRequiredFields(dclass, di) indicated client only. The field must be marked clsend or
# updateRequiredFields calls announceGenerate p2p, and may not be marked broadcast. """
return distObj
datagram = distObj.dclass.clientFormatUpdate(
fieldName, distObj.doId, args)
dgi = PyDatagramIterator(datagram)
# Reformat the packed datagram to change the message type and
# add the target id.
dgi.getUint16()
dg = PyDatagram()
dg.addUint16(CLIENT_OBJECT_UPDATE_FIELD_TARGETED_CMU)
dg.addUint32(channelId & 0xffffffff)
dg.appendData(dgi.getRemainingBytes())
self.send(dg)

View File

@ -25,9 +25,9 @@ class ClientRepositoryBase(ConnectionRepository):
""" """
notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepositoryBase") notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepositoryBase")
def __init__(self, dcFileNames = None): def __init__(self, dcFileNames = None, dcSuffix = ''):
self.dcSuffix=""
ConnectionRepository.__init__(self, ConnectionRepository.CM_HTTP, base.config, hasOwnerView=True) ConnectionRepository.__init__(self, ConnectionRepository.CM_HTTP, base.config, hasOwnerView=True)
self.dcSuffix = dcSuffix
if hasattr(self, 'setVerbose'): if hasattr(self, 'setVerbose'):
if self.config.GetBool('verbose-clientrepository'): if self.config.GetBool('verbose-clientrepository'):
self.setVerbose(1) self.setVerbose(1)
@ -81,7 +81,7 @@ class ClientRepositoryBase(ConnectionRepository):
if self.deferredGenerates: if self.deferredGenerates:
taskMgr.remove('deferredGenerate') taskMgr.remove('deferredGenerate')
taskMgr.doMethodLater(self.deferInterval, self.__doDeferredGenerate, 'deferredGenerate') taskMgr.doMethodLater(self.deferInterval, self.doDeferredGenerate, 'deferredGenerate')
## def queryObjectAll(self, doID, context=0): ## def queryObjectAll(self, doID, context=0):
## """ ## """
@ -117,18 +117,6 @@ class ClientRepositoryBase(ConnectionRepository):
# we might get a list of message names, use the first one # we might get a list of message names, use the first one
return makeList(MsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0] return makeList(MsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0]
def sendDisconnect(self):
if self.isConnected():
# Tell the game server that we're going:
datagram = PyDatagram()
# Add message type
datagram.addUint16(CLIENT_DISCONNECT)
# Send the message
self.send(datagram)
self.notify.info("Sent disconnect message to server")
self.disconnect()
self.stopHeartbeat()
def allocateContext(self): def allocateContext(self):
self.context+=1 self.context+=1
return self.context return self.context
@ -162,67 +150,7 @@ class ClientRepositoryBase(ConnectionRepository):
""" """
return time.time() + self.serverDelta return time.time() + self.serverDelta
def handleGenerateWithRequired(self, di): def doGenerate(self, parentId, zoneId, classId, doId, di):
parentId = di.getUint32()
zoneId = di.getUint32()
assert parentId == self.GameGlobalsId or parentId in self.doId2do
# Get the class Id
classId = di.getUint16()
# Get the DO Id
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
dclass.startGenerate()
# Create a new distributed object, and put it in the dictionary
distObj = self.generateWithRequiredFields(dclass, doId, di, parentId, zoneId)
dclass.stopGenerate()
def handleGenerateWithRequiredOther(self, di):
parentId = di.getUint32()
zoneId = di.getUint32()
# Get the class Id
classId = di.getUint16()
# Get the DO Id
doId = di.getUint32()
dclass = self.dclassesByNumber[classId]
deferrable = getattr(dclass.getClassDef(), 'deferrable', False)
if not self.deferInterval or self.noDefer:
deferrable = False
now = globalClock.getFrameTime()
if self.deferredGenerates or deferrable:
# This object is deferrable, or there are already deferred
# objects in the queue (so all objects have to be held
# up).
if self.deferredGenerates or now - self.lastGenerate < self.deferInterval:
# Queue it for later.
assert(self.notify.debug("deferring generate for %s %s" % (dclass.getName(), doId)))
self.deferredGenerates.append((CLIENT_CREATE_OBJECT_REQUIRED_OTHER, doId))
# Keep a copy of the datagram, and move the di to the copy
dg = Datagram(di.getDatagram())
di = DatagramIterator(dg, di.getCurrentIndex())
self.deferredDoIds[doId] = ((parentId, zoneId, classId, doId, di), deferrable, dg, [])
if len(self.deferredGenerates) == 1:
# We just deferred the first object on the queue;
# start the task to generate it.
taskMgr.remove('deferredGenerate')
taskMgr.doMethodLater(self.deferInterval, self.__doDeferredGenerate, 'deferredGenerate')
else:
# We haven't generated any deferrable objects in a
# while, so it's safe to go ahead and generate this
# one immediately.
self.lastGenerate = now
self.__doGenerate(parentId, zoneId, classId, doId, di)
else:
self.__doGenerate(parentId, zoneId, classId, doId, di)
def __doGenerate(self, parentId, zoneId, classId, doId, di):
# Look up the dclass # Look up the dclass
assert parentId == self.GameGlobalsId or parentId in self.doId2do assert parentId == self.GameGlobalsId or parentId in self.doId2do
dclass = self.dclassesByNumber[classId] dclass = self.dclassesByNumber[classId]
@ -252,7 +180,7 @@ class ClientRepositoryBase(ConnectionRepository):
if doId in self.deferredDoIds: if doId in self.deferredDoIds:
args, deferrable, dg, updates = self.deferredDoIds[doId] args, deferrable, dg, updates = self.deferredDoIds[doId]
del self.deferredDoIds[doId] del self.deferredDoIds[doId]
self.__doGenerate(*args) self.doGenerate(*args)
if deferrable: if deferrable:
self.lastGenerate = globalClock.getFrameTime() self.lastGenerate = globalClock.getFrameTime()
@ -272,7 +200,7 @@ class ClientRepositoryBase(ConnectionRepository):
else: else:
self.notify.warning("Ignoring deferred message %s" % (msgType)) self.notify.warning("Ignoring deferred message %s" % (msgType))
def __doDeferredGenerate(self, task): def doDeferredGenerate(self, task):
""" This is the task that generates an object on the deferred """ This is the task that generates an object on the deferred
queue. """ queue. """
@ -290,51 +218,6 @@ class ClientRepositoryBase(ConnectionRepository):
# All objects are generaetd. # All objects are generaetd.
return Task.done return Task.done
def handleGenerateWithRequiredOtherOwner(self, di):
# Get the class Id
classId = di.getUint16()
# Get the DO Id
doId = di.getUint32()
# parentId and zoneId are not relevant here
parentId = di.getUint32()
zoneId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
dclass.startGenerate()
# Create a new distributed object, and put it in the dictionary
distObj = self.generateWithRequiredOtherFieldsOwner(dclass, doId, di)
dclass.stopGenerate()
def handleQuietZoneGenerateWithRequired(self, di):
# Special handler for quiet zone generates -- we need to filter
parentId = di.getUint32()
zoneId = di.getUint32()
assert parentId in self.doId2do
# Get the class Id
classId = di.getUint16()
# Get the DO Id
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
dclass.startGenerate()
distObj = self.generateWithRequiredFields(dclass, doId, di, parentId, zoneId)
dclass.stopGenerate()
def handleQuietZoneGenerateWithRequiredOther(self, di):
# Special handler for quiet zone generates -- we need to filter
parentId = di.getUint32()
zoneId = di.getUint32()
assert parentId in self.doId2do
# Get the class Id
classId = di.getUint16()
# Get the DO Id
doId = di.getUint32()
# Look up the dclass
dclass = self.dclassesByNumber[classId]
dclass.startGenerate()
distObj = self.generateWithRequiredOtherFields(dclass, doId, di, parentId, zoneId)
dclass.stopGenerate()
def generateWithRequiredFields(self, dclass, doId, di, parentId, zoneId): def generateWithRequiredFields(self, dclass, doId, di, parentId, zoneId):
if self.doId2do.has_key(doId): if self.doId2do.has_key(doId):
# ...it is in our dictionary. # ...it is in our dictionary.
@ -471,12 +354,6 @@ class ClientRepositoryBase(ConnectionRepository):
return distObj return distObj
def handleDisable(self, di, ownerView=False):
# Get the DO Id
doId = di.getUint32()
# disable it.
self.disableDoId(doId, ownerView)
def disableDoId(self, doId, ownerView=False): def disableDoId(self, doId, ownerView=False):
table, cache = self.getTables(ownerView) table, cache = self.getTables(ownerView)
# Make sure the object exists # Make sure the object exists
@ -518,7 +395,7 @@ class ClientRepositoryBase(ConnectionRepository):
" is not in dictionary, ownerView=%s" % ownerView) " is not in dictionary, ownerView=%s" % ownerView)
def handleDelete(self, di): def handleDelete(self, di):
# overridden by ToontownClientRepository # overridden by ClientRepository
assert 0 assert 0
def handleUpdateField(self, di): def handleUpdateField(self, di):
@ -653,25 +530,6 @@ class ClientRepositoryBase(ConnectionRepository):
return doDict return doDict
def sendSetLocation(self, doId, parentId, zoneId):
datagram = PyDatagram()
datagram.addUint16(CLIENT_OBJECT_LOCATION)
datagram.addUint32(doId)
datagram.addUint32(parentId)
datagram.addUint32(zoneId)
self.send(datagram)
def sendHeartbeat(self):
datagram = PyDatagram()
# Add message type
datagram.addUint16(CLIENT_HEARTBEAT)
# Send it!
self.send(datagram)
self.lastHeartbeat = globalClock.getRealTime()
# This is important enough to consider flushing immediately
# (particularly if we haven't run readerPollTask recently).
self.considerFlush()
def considerHeartbeat(self): def considerHeartbeat(self):
"""Send a heartbeat message if we haven't sent one recently.""" """Send a heartbeat message if we haven't sent one recently."""
if not self.heartbeatStarted: if not self.heartbeatStarted:

View File

@ -22,6 +22,7 @@ class ConnectionRepository(
""" """
notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository") notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository")
taskPriority = -30 taskPriority = -30
taskChain = None
CM_HTTP=0 CM_HTTP=0
CM_NET=1 CM_NET=1
@ -569,7 +570,7 @@ class ConnectionRepository(
self.accept(CConnectionRepository.getOverflowEventName(), self.accept(CConnectionRepository.getOverflowEventName(),
self.handleReaderOverflow) self.handleReaderOverflow)
taskMgr.add(self.readerPollUntilEmpty, self.uniqueName("readerPollTask"), taskMgr.add(self.readerPollUntilEmpty, self.uniqueName("readerPollTask"),
priority = self.taskPriority, taskChain = 'net') priority = self.taskPriority, taskChain = self.taskChain)
def stopReaderPollTask(self): def stopReaderPollTask(self):
taskMgr.remove(self.uniqueName("readerPollTask")) taskMgr.remove(self.uniqueName("readerPollTask"))
@ -611,7 +612,7 @@ class ConnectionRepository(
# Zero-length datagrams might freak out the server. No point # Zero-length datagrams might freak out the server. No point
# in sending them, anyway. # in sending them, anyway.
if datagram.getLength() > 0: if datagram.getLength() > 0:
if ConnectionRepository.notify.getDebug(): if self.notify.getDebug():
print "ConnectionRepository sending datagram:" print "ConnectionRepository sending datagram:"
datagram.dumpHex(ostream) datagram.dumpHex(ostream)

View File

@ -1,4 +1,3 @@
from otp.ai.AIBaseGlobal import *
from pandac.PandaModules import NodePath from pandac.PandaModules import NodePath
import DistributedObjectAI import DistributedObjectAI
import GridParent import GridParent

View File

@ -438,15 +438,21 @@ class DistributedObject(DistributedObjectBase):
# should call doneBarrier(), which will send the context # should call doneBarrier(), which will send the context
# number back to the AI. # number back to the AI.
for context, name, avIds in data: for context, name, avIds in data:
if base.localAvatar.doId in avIds: for avId in avIds:
# We found localToon's id; stop here. if self.cr.isLocalId(avId):
# We found the local avatar's id; stop here.
self.__barrierContext = (context, name) self.__barrierContext = (context, name)
assert self.notify.debug('setBarrierData(%s, %s)' % (context, name)) assert self.notify.debug('setBarrierData(%s, %s)' % (context, name))
return return
# This barrier didn't involve this client; ignore it.
assert self.notify.debug('setBarrierData(%s)' % (None)) assert self.notify.debug('setBarrierData(%s)' % (None))
self.__barrierContext = None self.__barrierContext = None
def getBarrierData(self):
# Return a trivially-empty (context, name, avIds) value.
return ((0, '', []),)
def doneBarrier(self, name = None): def doneBarrier(self, name = None):
# Tells the AI we have finished handling our task. If the # Tells the AI we have finished handling our task. If the
# optional name parameter is specified, it must match the # optional name parameter is specified, it must match the
@ -508,9 +514,6 @@ class DistributedObject(DistributedObjectBase):
# avatar class overrides this to return true. # avatar class overrides this to return true.
return self.cr and self.cr.isLocalId(self.doId) return self.cr and self.cr.isLocalId(self.doId)
def updateZone(self, zoneId):
self.cr.sendUpdateZone(self, zoneId)
def isGridParent(self): def isGridParent(self):
# If this distributed object is a DistributedGrid return 1. 0 by default # If this distributed object is a DistributedGrid return 1. 0 by default
return 0 return 0

View File

@ -3,7 +3,6 @@
from direct.directnotify.DirectNotifyGlobal import directNotify from direct.directnotify.DirectNotifyGlobal import directNotify
from direct.distributed.DistributedObjectBase import DistributedObjectBase from direct.distributed.DistributedObjectBase import DistributedObjectBase
from direct.showbase import PythonUtil from direct.showbase import PythonUtil
from otp.ai.AIZoneData import AIZoneData
from pandac.PandaModules import * from pandac.PandaModules import *
#from PyDatagram import PyDatagram #from PyDatagram import PyDatagram
#from PyDatagramIterator import PyDatagramIterator #from PyDatagramIterator import PyDatagramIterator
@ -303,6 +302,7 @@ class DistributedObjectAI(DistributedObjectBase):
# setLocation destroys self._zoneData if we move away to # setLocation destroys self._zoneData if we move away to
# a different zone # a different zone
if self._zoneData is None: if self._zoneData is None:
from otp.ai.AIZoneData import AIZoneData
self._zoneData = AIZoneData(self.air, self.parentId, self.zoneId) self._zoneData = AIZoneData(self.air, self.parentId, self.zoneId)
return self._zoneData return self._zoneData
@ -334,9 +334,7 @@ class DistributedObjectAI(DistributedObjectBase):
def sendUpdate(self, fieldName, args = []): def sendUpdate(self, fieldName, args = []):
assert self.notify.debugStateCall(self) assert self.notify.debugStateCall(self)
if self.air: if self.air:
dg = self.dclass.aiFormatUpdate( self.air.sendUpdate(self, fieldName, args)
fieldName, self.doId, self.doId, self.air.ourChannel, args)
self.air.sendDatagram(dg)
def GetPuppetConnectionChannel(self, doId): def GetPuppetConnectionChannel(self, doId):
return doId + (1L << 32) return doId + (1L << 32)
@ -510,14 +508,14 @@ class DistributedObjectAI(DistributedObjectBase):
self.__barriers[context] = barrier self.__barriers[context] = barrier
# Send the context number to each involved client. # Send the context number to each involved client.
self.sendUpdate("setBarrierData", [self.__getBarrierData()]) self.sendUpdate("setBarrierData", [self.getBarrierData()])
else: else:
# No avatars; just call the callback immediately. # No avatars; just call the callback immediately.
callback(avIds) callback(avIds)
return context return context
def __getBarrierData(self): def getBarrierData(self):
# Returns the barrier data formatted for sending to the # Returns the barrier data formatted for sending to the
# clients. This lists all of the current outstanding barriers # clients. This lists all of the current outstanding barriers
# and the avIds waiting for them. # and the avIds waiting for them.
@ -567,3 +565,7 @@ class DistributedObjectAI(DistributedObjectBase):
def execCommand(self, string, mwMgrId, avId, zoneId): def execCommand(self, string, mwMgrId, avId, zoneId):
pass pass
def _retrieveCachedData(self):
""" This is a no-op on the AI. """
pass

View File

@ -379,6 +379,25 @@ class DistributedSmoothNode(DistributedNode.DistributedNode,
self.smoother.getLatestPosition(): self.smoother.getLatestPosition():
self.smoother.applySmoothPosHpr(self, self) self.smoother.applySmoothPosHpr(self, self)
# These are all required by the CMU server, which requires get* to
# match set* in more cases than the Disney server does.
def getComponentL(self):
return self.zoneId
def getComponentX(self):
return self.getX()
def getComponentY(self):
return self.getY()
def getComponentZ(self):
return self.getZ()
def getComponentH(self):
return self.getH()
def getComponentP(self):
return self.getP()
def getComponentR(self):
return self.getR()
def getComponentT(self):
return 0
@report(types = ['args'], dConfigParam = 'smoothnode') @report(types = ['args'], dConfigParam = 'smoothnode')
def clearSmoothing(self, bogus = None): def clearSmoothing(self, bogus = None):
# Call this to invalidate all the old position reports # Call this to invalidate all the old position reports

View File

@ -1,4 +1,3 @@
from otp.ai.AIBaseGlobal import *
import DistributedNodeAI import DistributedNodeAI
import DistributedSmoothNodeBase import DistributedSmoothNodeBase

View File

@ -23,6 +23,9 @@ class DoCollectionManager:
def getDo(self, doId): def getDo(self, doId):
return self.doId2do.get(doId) return self.doId2do.get(doId)
def getGameDoId(self):
return self.GameGlobalsId
def callbackWithDo(self, doId, callback): def callbackWithDo(self, doId, callback):
do = self.doId2do.get(doId) do = self.doId2do.get(doId)
if do is not None: if do is not None:
@ -337,14 +340,14 @@ class DoCollectionManager:
parentObj = self.doId2do.get(parentId) parentObj = self.doId2do.get(parentId)
if parentObj is not None: if parentObj is not None:
parentObj.handleChildArrive(object, zoneId) parentObj.handleChildArrive(object, zoneId)
elif parentId not in (0, self.getGameDoId()): elif parentId not in (None, 0, self.getGameDoId()):
self.notify.warning('storeObjectLocation(%s): parent %s not present' % self.notify.warning('storeObjectLocation(%s): parent %s not present' %
(object.doId, parentId)) (object.doId, parentId))
elif oldZoneId != zoneId: elif oldZoneId != zoneId:
parentObj = self.doId2do.get(parentId) parentObj = self.doId2do.get(parentId)
if parentObj is not None: if parentObj is not None:
parentObj.handleChildArriveZone(object, zoneId) parentObj.handleChildArriveZone(object, zoneId)
elif parentId not in (0, self.getGameDoId()): elif parentId not in (None, 0, self.getGameDoId()):
self.notify.warning('storeObjectLocation(%s): parent %s not present' % self.notify.warning('storeObjectLocation(%s): parent %s not present' %
(object.doId, parentId)) (object.doId, parentId))

View File

@ -88,6 +88,13 @@ MsgName2Id = {
'CLIENT_SYSTEMMESSAGE_AKNOWLEDGE': 123, 'CLIENT_SYSTEMMESSAGE_AKNOWLEDGE': 123,
'CLIENT_CHANGE_GENERATE_ORDER': 124, 'CLIENT_CHANGE_GENERATE_ORDER': 124,
'STATESERVER_OBJECT_GENERATE_WITH_REQUIRED': 2001,
'STATESERVER_OBJECT_GENERATE_WITH_REQUIRED_OTHER': 2003,
'STATESERVER_OBJECT_UPDATE_FIELD': 2004,
'STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT': 2050,
'STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT': 2051,
'STATESERVER_BOUNCE_MESSAGE': 2086,
} }
# create id->name table for debugging # create id->name table for debugging

View File

@ -0,0 +1,31 @@
""" MsgTypesCMU module: defines the various message type codes as used
by the CMU ServerRepository/ClientRepository code in this directory.
It replaces the MsgTypes module, which is not used by the CMU
implementation. """
from direct.showbase.PythonUtil import invertDictLossless
MsgName2Id = {
'SET_DOID_RANGE_CMU' : 9001,
'CLIENT_OBJECT_GENERATE_CMU' : 9002,
'OBJECT_GENERATE_CMU' : 9003,
'OBJECT_UPDATE_FIELD_CMU' : 9004,
'OBJECT_DISABLE_CMU' : 9005,
'OBJECT_DELETE_CMU' : 9006,
'REQUEST_GENERATES_CMU' : 9007,
'CLIENT_DISCONNECT_CMU' : 9008,
'CLIENT_SET_INTEREST_CMU' : 9009,
'OBJECT_SET_ZONE_CMU' : 9010,
'CLIENT_HEARTBEAT_CMU' : 9011,
'CLIENT_OBJECT_UPDATE_FIELD_TARGETED_CMU' : 9011,
'CLIENT_OBJECT_UPDATE_FIELD' : 24, # Matches MsgTypes.CLIENT_OBJECT_UPDATE_FIELD
}
# create id->name table for debugging
MsgId2Names = invertDictLossless(MsgName2Id)
# put msg names in module scope, assigned to msg value
for name, value in MsgName2Id.items():
exec '%s = %s' % (name, value)
del name, value

View File

@ -2,7 +2,7 @@
from pandac.PandaModules import * from pandac.PandaModules import *
#from TaskManagerGlobal import * #from TaskManagerGlobal import *
from direct.distributed.MsgTypes import * from direct.distributed.MsgTypesCMU import *
from direct.task import Task from direct.task import Task
from direct.directnotify import DirectNotifyGlobal from direct.directnotify import DirectNotifyGlobal
from direct.distributed.PyDatagram import PyDatagram from direct.distributed.PyDatagram import PyDatagram
@ -15,34 +15,129 @@ class ServerRepository:
""" This maintains the server-side connection with a Panda server. """ This maintains the server-side connection with a Panda server.
It is only for use with the Panda LAN server provided by CMU.""" It is only for use with the Panda LAN server provided by CMU."""
notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository") notify = DirectNotifyGlobal.directNotify.newCategory("ServerRepository")
def __init__(self, tcpPort, udpPort, dcFileNames = None): class Client:
""" This internal class keeps track of the data associated
with each connected client. """
def __init__(self, connection, netAddress, doIdBase):
# The connection used to communicate with the client.
self.connection = connection
# The net address to the client, including IP address.
# Used for reporting purposes only.
self.netAddress = netAddress
# The first doId in the range assigned to the client.
# This also serves as a unique numeric ID for this client.
# (It is sometimes called "avatarId" in some update
# messages, even though the client is not required to use
# this particular number as an avatar ID.)
self.doIdBase = doIdBase
# The set of zoneIds that the client explicitly has
# interest in. The client will receive updates for all
# distributed objects appearing in one of these zones.
# (The client will also receive updates for all zones in
# which any one of the distributed obejcts that it has
# created still exist.)
self.explicitInterestZoneIds = set()
# The set of interest zones sent to the client at the last
# update. This is the actual set of zones the client is
# informed of. Changing the explicitInterestZoneIds,
# above, creating or deleting objects in different zones,
# or moving objects between zones, might influence this
# set.
self.currentInterestZoneIds = set()
# A dictionary of doId -> Object, for distributed objects
# currently in existence that were created by the client.
self.objectsByDoId = {}
# A dictionary of zoneId -> set([Object]), listing the
# distributed objects assigned to each zone, of the
# objects created by this client.
self.objectsByZoneId = {}
class Object:
""" This internal class keeps track of the data associated
with each extent distributed object. """
def __init__(self, doId, zoneId, dclass):
# The object's distributed ID.
self.doId = doId
# The object's current zone. Each object is associated
# with only one zone.
self.zoneId = zoneId
# The object's class type.
self.dclass = dclass
# Note that the server does not store any other data about
# the distributed objects; in particular, it doesn't
# record its current fields. That is left to the clients.
def __init__(self, tcpPort, udpPort = None, dcFileNames = None):
# Set up networking interfaces.
self.qcm = QueuedConnectionManager() self.qcm = QueuedConnectionManager()
self.qcl = QueuedConnectionListener(self.qcm, 0) self.qcl = QueuedConnectionListener(self.qcm, 0)
self.qcr = QueuedConnectionReader(self.qcm, 0) self.qcr = QueuedConnectionReader(self.qcm, 0)
self.cw = ConnectionWriter(self.qcm, 0) self.cw = ConnectionWriter(self.qcm, 0)
self.tcpRendezvous = self.qcm.openTCPServerRendezvous(tcpPort, 10) self.tcpRendezvous = self.qcm.openTCPServerRendezvous(tcpPort, 10)
print self.tcpRendezvous
self.qcl.addConnection(self.tcpRendezvous) self.qcl.addConnection(self.tcpRendezvous)
taskMgr.add(self.listenerPoll, "serverListenerPollTask") taskMgr.add(self.listenerPoll, "serverListenerPollTask")
taskMgr.add(self.readerPollUntilEmpty, "serverReaderPollTask") taskMgr.add(self.readerPollUntilEmpty, "serverReaderPollTask")
taskMgr.add(self.clientHardDisconnectTask, "clientHardDisconnect") taskMgr.add(self.clientHardDisconnectTask, "clientHardDisconnect")
self.ClientIP = {}
self.ClientZones = {} # A set of clients that have recently been written to and may
self.ClientDOIDbase = {} # need to be flushed.
self.ClientObjects = {} self.needsFlush = set()
self.DOIDnext = 1
self.DOIDrange = 1000000 collectTcpInterval = ConfigVariableDouble('collect-tcp-interval').getValue()
self.DOIDtoClient = {} taskMgr.doMethodLater(collectTcpInterval, self.flushTask, 'flushTask')
self.DOIDtoZones = {}
self.DOIDtoDClass = {} # A dictionary of connection -> Client object, tracking all of
self.ZonesToClients = {} # the clients we currently have connected.
self.ZonetoDOIDs = {} self.clientsByConnection = {}
# A similar dictionary of doIdBase -> Client object, indexing
# by the client's doIdBase number instead.
self.clientsByDoIdBase = {}
# A dictionary of zoneId -> set([Client]), listing the clients
# that have an interest in each zoneId.
self.zonesToClients = {}
# A dictionary of zoneId -> set([Object]), listing the
# distributed objects assigned to each zone, globally.
self.objectsByZoneId = {}
# The number of doId's to assign to each client. Must remain
# constant during server lifetime.
self.doIdRange = base.config.GetInt('server-doid-range', 1000000)
# An allocator object that assigns the next doIdBase to each
# client.
self.idAllocator = UniqueIdAllocator(0, 0xffffffff / self.doIdRange)
self.dcFile = DCFile() self.dcFile = DCFile()
self.dcSuffix = '' self.dcSuffix = ''
self.readDCFile(dcFileNames) self.readDCFile(dcFileNames)
def flushTask(self, task):
""" This task is run periodically to flush any connections
that might need it. It's only necessary in cases where
collect-tcp is set true (if this is false, messages are sent
immediately and do not require periodic flushing). """
for client in self.needsFlush:
client.connection.flush()
self.needsFlush = set()
return task.again
def importModule(self, dcImports, moduleName, importSymbols): def importModule(self, dcImports, moduleName, importSymbols):
""" Imports the indicated moduleName and all of its symbols """ Imports the indicated moduleName and all of its symbols
into the current namespace. This more-or-less reimplements into the current namespace. This more-or-less reimplements
@ -142,7 +237,7 @@ class ServerRepository:
classDef = dcImports.get(className) classDef = dcImports.get(className)
if classDef == None: if classDef == None:
self.notify.info("No class definition for %s." % (className)) self.notify.debug("No class definition for %s." % (className))
else: else:
if type(classDef) == types.ModuleType: if type(classDef) == types.ModuleType:
if not hasattr(classDef, className): if not hasattr(classDef, className):
@ -168,31 +263,38 @@ class ServerRepository:
newConnection = PointerToConnection() newConnection = PointerToConnection()
retVal = self.qcl.getNewConnection(rendezvous, netAddress, retVal = self.qcl.getNewConnection(rendezvous, netAddress,
newConnection) newConnection)
if retVal: if not retVal:
return
# Crazy dereferencing # Crazy dereferencing
newConnection=newConnection.p() newConnection=newConnection.p()
self.qcr.addConnection(newConnection) self.qcr.addConnection(newConnection)
# Add clients infomation to dictionary
self.ClientIP[newConnection] = netAddress.getIpString() # Add clients information to dictionary
self.ClientZones[newConnection] = [] id = self.idAllocator.allocate()
self.ClientObjects[newConnection] = [] doIdBase = id * self.doIdRange + 1
self.notify.info(
"Got client %s from %s" % (doIdBase, netAddress))
client = self.Client(newConnection, netAddress, doIdBase)
self.clientsByConnection[client.connection] = client
self.clientsByDoIdBase[client.doIdBase] = client
self.lastConnection = newConnection self.lastConnection = newConnection
self.sendDOIDrange(self.lastConnection) self.sendDoIdRange(client)
else:
self.notify.warning(
"getNewConnection returned false")
return Task.cont return Task.cont
# continuously polls for new messages on the server
def readerPollUntilEmpty(self, task): def readerPollUntilEmpty(self, task):
""" continuously polls for new messages on the server """
while self.readerPollOnce(): while self.readerPollOnce():
pass pass
return Task.cont return Task.cont
# checks for available messages to the server
def readerPollOnce(self): def readerPollOnce(self):
""" checks for available messages to the server """
availGetVal = self.qcr.dataAvailable() availGetVal = self.qcr.dataAvailable()
if availGetVal: if availGetVal:
datagram = NetDatagram() datagram = NetDatagram()
@ -200,212 +302,388 @@ class ServerRepository:
if readRetVal: if readRetVal:
# need to send to message processing unit # need to send to message processing unit
self.handleDatagram(datagram) self.handleDatagram(datagram)
else:
self.notify.warning("getData returned false")
return availGetVal return availGetVal
# switching station for messages
def handleDatagram(self, datagram): def handleDatagram(self, datagram):
""" switching station for messages """
client = self.clientsByConnection.get(datagram.getConnection())
if self.notify.getDebug():
self.notify.debug(
"ServerRepository received datagram from %s:" % (client.doIdBase))
datagram.dumpHex(ostream)
if not client:
# This shouldn't be possible.
self.notify.error(
"Received datagram from unknown connection.")
dgi = DatagramIterator(datagram) dgi = DatagramIterator(datagram)
type = dgi.getUint16() type = dgi.getUint16()
if type == CLIENT_DISCONNECT: if type == CLIENT_DISCONNECT_CMU:
self.handleClientDisconnect(datagram.getConnection()) self.handleClientDisconnect(client)
elif type == CLIENT_SET_ZONE_CMU: elif type == CLIENT_SET_INTEREST_CMU:
self.handleSetZone(dgi, datagram.getConnection()) self.handleClientSetInterest(client, dgi)
elif type == CLIENT_REMOVE_ZONE: elif type == CLIENT_OBJECT_GENERATE_CMU:
self.handleRemoveZone(dgi, datagram.getConnection()) self.handleClientCreateObject(datagram, dgi)
elif type == CLIENT_CREATE_OBJECT_REQUIRED:
self.handleClientCreateObjectRequired(datagram, dgi)
elif type == CLIENT_OBJECT_UPDATE_FIELD: elif type == CLIENT_OBJECT_UPDATE_FIELD:
self.handleClientUpdateField(datagram, dgi) self.handleClientObjectUpdateField(datagram, dgi)
elif type == CLIENT_OBJECT_DELETE: elif type == CLIENT_OBJECT_UPDATE_FIELD_TARGETED_CMU:
self.handleClientObjectUpdateField(datagram, dgi, targeted = True)
elif type == OBJECT_DELETE_CMU:
self.handleClientDeleteObject(datagram, dgi.getUint32()) self.handleClientDeleteObject(datagram, dgi.getUint32())
elif type == CLIENT_OBJECT_DISABLE: elif type == OBJECT_SET_ZONE_CMU:
self.handleClientDisable(datagram, dgi.getUint32()) self.handleClientObjectSetZone(datagram, dgi)
else: else:
self.handleMessageType(type, dgi) self.handleMessageType(type, dgi)
def handleMessageType(self, msgType, di): def handleMessageType(self, msgType, di):
self.notify.error("unrecognized message") self.notify.warning("unrecognized message type %s" % (msgType))
# client wants to create an object, so we store appropriate data, def handleClientCreateObject(self, datagram, dgi):
# and then pass message along to corresponding zones """ client wants to create an object, so we store appropriate
data, and then pass message along to corresponding zones """
def handleClientCreateObjectRequired(self, datagram, dgi):
connection = datagram.getConnection() connection = datagram.getConnection()
# no need to create a new message, just forward the received zoneId = dgi.getUint32()
# message as it has the same msg type number classId = dgi.getUint16()
zone = dgi.getUint32() doId = dgi.getUint32()
classid = dgi.getUint16()
doid = dgi.getUint32()
rest = dgi.getRemainingBytes()
datagram = NetDatagram()
datagram.addUint16(CLIENT_CREATE_OBJECT_REQUIRED)
datagram.addUint16(classid)
datagram.addUint32(doid)
datagram.appendData(rest)
dclass = self.dclassesByNumber[classid]
if self.ClientObjects[connection].count(doid) == 0:
self.ClientObjects[connection].append(doid)
self.DOIDtoZones[doid] = zone
self.DOIDtoDClass[doid] = dclass
if zone in self.ZonetoDOIDs:
if self.ZonetoDOIDs[zone].count(doid)==0:
self.ZonetoDOIDs[zone].append(doid)
else:
self.ZonetoDOIDs[zone] = [doid]
self.sendToZoneExcept(zone, datagram, connection)
client = self.clientsByConnection[connection]
# client wants to update an object, forward message along if self.getDoIdBase(doId) != client.doIdBase:
# to corresponding zone self.notify.warning(
"Ignoring attempt to create invalid doId %s from client %s" % (doId, client.doIdBase))
def handleClientUpdateField(self, datagram, dgi):
connection = datagram.getConnection()
doid = dgi.getUint32()
fieldid = dgi.getUint16()
dclass = self.DOIDtoDClass[doid]
dcfield = dclass.getFieldByIndex(fieldid)
if dcfield == None:
self.notify.error(
"Received update for field %s on object %s; no such field for class %s." % (
fieldid, doid, dclass.getName()))
return return
if (dcfield.hasKeyword('broadcast')):
if (dcfield.hasKeyword('p2p')): dclass = self.dclassesByNumber[classId]
self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, 0)
object = client.objectsByDoId.get(doId)
if object:
# This doId is already in use; thus, this message is
# really just an update.
if object.dclass != dclass:
self.notify.warning(
"Ignoring attempt to change object %s from %s to %s by client %s" % (
doId, object.dclass.getName(), dclass.getName(), client.doIdBase))
return
self.setObjectZone(client, object, zoneId)
else: else:
self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, connection) if self.notify.getDebug():
elif (dcfield.hasKeyword('p2p')): self.notify.debug(
doidbase = (doid / self.DOIDrange) * self.DOIDrange "Creating object %s of type %s by client %s" % (
self.cw.send(datagram, self.DOIDtoClient[doidbase]) doId, dclass.getName(), client.doIdBase))
object = self.Object(doId, zoneId, dclass)
client.objectsByDoId[doId] = object
client.objectsByZoneId.setdefault(zoneId, set()).add(object)
self.objectsByZoneId.setdefault(zoneId, set()).add(object)
self.updateClientInterestZones(client)
# Rebuild the new datagram that we'll send on. We shim in the
# doIdBase of the owner.
dg = PyDatagram()
dg.addUint16(OBJECT_GENERATE_CMU)
dg.addUint32(client.doIdBase)
dg.addUint32(zoneId)
dg.addUint16(classId)
dg.addUint32(doId)
dg.appendData(dgi.getRemainingBytes())
self.sendToZoneExcept(zoneId, dg, [client])
def handleClientObjectUpdateField(self, datagram, dgi, targeted = False):
""" Received an update request from a client. """
connection = datagram.getConnection()
client = self.clientsByConnection[connection]
if targeted:
targetId = dgi.getUint32()
doId = dgi.getUint32()
fieldId = dgi.getUint16()
doIdBase = self.getDoIdBase(doId)
owner = self.clientsByDoIdBase.get(doIdBase)
object = owner and owner.objectsByDoId.get(doId)
if not object:
self.notify.warning(
"Ignoring update for unknown object %s from client %s" % (
doId, client.doIdBase))
return
dcfield = object.dclass.getFieldByIndex(fieldId)
if dcfield == None:
self.notify.warning(
"Ignoring update for field %s on object %s from client %s; no such field for class %s." % (
fieldId, doId, client.doIdBase, object.dclass.getName()))
if client != owner:
# This message was not sent by the object's owner.
if not dcfield.hasKeyword('clsend') and not dcfield.hasKeyword('p2p'):
self.notify.warning(
"Ignoring update for %s.%s on object %s from client %s: not owner" % (
dclass.getName(), dcfield.getName(), doId, client.doIdBase))
return
# We reformat the message slightly to insert the sender's
# doIdBase.
dg = PyDatagram()
dg.addUint16(OBJECT_UPDATE_FIELD_CMU)
dg.addUint32(client.doIdBase)
dg.addUint32(doId)
dg.addUint16(fieldId)
dg.appendData(dgi.getRemainingBytes())
if targeted:
# A targeted update: only to the indicated client.
target = self.clientsByDoIdBase.get(targetId)
if not target:
self.notify.warning(
"Ignoring targeted update to %s for %s.%s on object %s from client %s: target not known" % (
targetId,
dclass.getName(), dcfield.getName(), doId, client.doIdBase))
return
self.cw.send(dg, target.connection)
self.needsFlush.add(target)
elif dcfield.hasKeyword('p2p'):
# p2p: to object owner only
self.cw.send(dg, owner.connection)
self.needsFlush.add(owner)
elif dcfield.hasKeyword('broadcast'):
# Broadcast: to everyone except orig sender
self.sendToZoneExcept(object.zoneId, dg, [client])
elif dcfield.hasKeyword('reflect'):
# Reflect: broadcast to everyone including orig sender
self.sendToZoneExcept(object.zoneId, dg, [])
else: else:
self.notify.warning( self.notify.warning(
"Message is not broadcast, p2p, or broadcast+p2p") "Message is not broadcast or p2p")
# client disables an object, let everyone know who is in def getDoIdBase(self, doId):
# that zone know about it """ Given a doId, return the corresponding doIdBase. This
will be the owner of the object (clients may only create
object doId's within their assigned range). """
def handleClientDisable(self, datagram, doid): return int(doId / self.doIdRange) * self.doIdRange + 1
# now send disable message to all clients that need to know
if doid in self.DOIDtoZones:
self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, 0)
# client deletes an object, let everyone who is in zone with def handleClientDeleteObject(self, datagram, doId):
# object know about it """ client deletes an object, let everyone who has interest in
the object's zone know about it. """
def handleClientDeleteObject(self, datagram, doid): connection = datagram.getConnection()
if doid in self.DOIDtoZones: client = self.clientsByConnection[connection]
self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, 0) object = client.objectsByDoId.get(doId)
self.ClientObjects[datagram.getConnection()].remove(doid) if not object:
self.ZonetoDOIDs[self.DOIDtoZones[doid]].remove(doid) self.notify.warning(
del self.DOIDtoZones[doid] "Ignoring update for unknown object %s from client %s" % (
del self.DOIDtoDClass[doid] doId, client.doIdBase))
return
def sendAvatarGenerate(self): self.sendToZoneExcept(object.zoneId, datagram, [])
self.objectsByZoneId[object.zoneId].remove(object)
if not self.objectsByZoneId[object.zoneId]:
del self.objectsByZoneId[object.zoneId]
client.objectsByZoneId[object.zoneId].remove(object)
if not client.objectsByZoneId[object.zoneId]:
del client.objectsByZoneId[object.zoneId]
del client.objectsByDoId[doId]
self.updateClientInterestZones(client)
def handleClientObjectSetZone(self, datagram, dgi):
""" The client is telling us the object is changing to a new
zone. """
doId = dgi.getUint32()
zoneId = dgi.getUint32()
connection = datagram.getConnection()
client = self.clientsByConnection[connection]
object = client.objectsByDoId.get(doId)
if not object:
# Don't know this object.
self.notify.warning("Ignoring object location for %s: unknown" % (doId))
return
self.setObjectZone(client, object, zoneId)
def setObjectZone(self, owner, object, zoneId):
if object.zoneId == zoneId:
# No change.
return
oldZoneId = object.zoneId
self.objectsByZoneId[object.zoneId].remove(object)
if not self.objectsByZoneId[object.zoneId]:
del self.objectsByZoneId[object.zoneId]
owner.objectsByZoneId[object.zoneId].remove(object)
if not owner.objectsByZoneId[object.zoneId]:
del owner.objectsByZoneId[object.zoneId]
object.zoneId = zoneId
self.objectsByZoneId.setdefault(zoneId, set()).add(object)
owner.objectsByZoneId.setdefault(zoneId, set()).add(object)
self.updateClientInterestZones(owner)
# Any clients that are listening to oldZoneId but not zoneId
# should receive a disable message: this object has just gone
# out of scope for you.
datagram = PyDatagram() datagram = PyDatagram()
# Message type is 1 datagram.addUint16(OBJECT_DISABLE_CMU)
datagram.addUint16(ALL_OBJECT_GENERATE_WITH_REQUIRED) datagram.addUint32(object.doId)
# Avatar class type is 2 for client in self.zonesToClients[oldZoneId]:
datagram.addUint8(2) if client != owner:
# A sample id if zoneId not in client.currentInterestZoneIds:
datagram.addUint32(10) self.cw.send(datagram, client.connection)
# The only required field is the zone field self.needsFlush.add(client)
datagram.addUint32(999)
self.cw.send(datagram, self.lastConnection)
# sends the client the range of doid's that the client can use # The client is now responsible for sending a generate for the
# object that just switched zones, to inform the clients that
# are listening to the new zoneId but not the old zoneId.
def sendDoIdRange(self, client):
""" sends the client the range of doid's that the client can
use """
def sendDOIDrange(self, connection):
# reuse DOID assignments if we can
id = self.DOIDnext + self.DOIDrange
self.DOIDnext = self.DOIDnext + self.DOIDrange
self.DOIDtoClient[id] = connection
self.ClientDOIDbase[connection] = id
datagram = NetDatagram() datagram = NetDatagram()
datagram.addUint16(CLIENT_SET_DOID_RANGE) datagram.addUint16(SET_DOID_RANGE_CMU)
datagram.addUint32(id) datagram.addUint32(client.doIdBase)
datagram.addUint32(self.DOIDrange) datagram.addUint32(self.doIdRange)
print "Sending DOID range: ", id, self.DOIDrange
self.cw.send(datagram, connection)
# a client disconnected from us, we need to update our data, also tell other clients to remove self.cw.send(datagram, client.connection)
# the disconnected clients objects self.needsFlush.add(client)
def handleClientDisconnect(self, connection):
if (self.ClientIP.has_key(connection)): # a client disconnected from us, we need to update our data, also
del self.DOIDtoClient[self.ClientDOIDbase[connection]] # tell other clients to remove the disconnected clients objects
for zone in self.ClientZones[connection]: def handleClientDisconnect(self, client):
if len(self.ZonesToClients[zone]) == 1: for zoneId in client.currentInterestZoneIds:
del self.ZonesToClients[zone] if len(self.zonesToClients[zoneId]) == 1:
del self.zonesToClients[zoneId]
else: else:
self.ZonesToClients[zone].remove(connection) self.zonesToClients[zoneId].remove(client)
for obj in self.ClientObjects[connection]:
for object in client.objectsByDoId.values():
#create and send delete message #create and send delete message
datagram = NetDatagram() datagram = NetDatagram()
datagram.addUint16(CLIENT_OBJECT_DELETE_RESP) datagram.addUint16(OBJECT_DELETE_CMU)
datagram.addUint32(obj) datagram.addUint32(object.doId)
self.sendToZoneExcept(self.DOIDtoZones[obj], datagram, 0) self.sendToZoneExcept(object.zoneId, datagram, [])
self.ZonetoDOIDs[self.DOIDtoZones[obj]].remove(obj) self.objectsByZoneId[object.zoneId].remove(object)
del self.DOIDtoZones[obj] if not self.objectsByZoneId[object.zoneId]:
del self.DOIDtoDClass[obj] del self.objectsByZoneId[object.zoneId]
del self.ClientIP[connection]
del self.ClientZones[connection]
del self.ClientDOIDbase[connection]
del self.ClientObjects[connection]
# client told us its zone(s), store information client.objectsByDoId = {}
def handleSetZone(self, dgi, connection): client.objectsByZoneId = {}
del self.clientsByConnection[client.connection]
del self.clientsByDoIdBase[client.doIdBase]
id = client.doIdBase / self.doIdRange
self.idAllocator.free(id)
self.qcr.removeConnection(client.connection)
self.qcm.closeConnection(client.connection)
def handleClientSetInterest(self, client, dgi):
""" The client is specifying a particular set of zones it is
interested in. """
zoneIds = set()
while dgi.getRemainingSize() > 0: while dgi.getRemainingSize() > 0:
ZoneID = dgi.getUint32() zoneId = dgi.getUint32()
if self.ClientZones[connection].count(ZoneID) == 0: zoneIds.add(zoneId)
self.ClientZones[connection].append(ZoneID)
if ZoneID in self.ZonesToClients:
if self.ZonesToClients[ZoneID].count(connection) == 0:
self.ZonesToClients[ZoneID].append(connection)
else:
self.ZonesToClients[ZoneID] = [connection]
# We have a new member, need to get all of the data from clients who may have objects in this zone client.explicitInterestZoneIds = zoneIds
self.updateClientInterestZones(client)
def updateClientInterestZones(self, client):
""" Something about the client has caused its set of interest
zones to potentially change. Recompute them. """
origZoneIds = client.currentInterestZoneIds
newZoneIds = client.explicitInterestZoneIds | set(client.objectsByZoneId.keys())
if origZoneIds == newZoneIds:
# No change.
return
client.currentInterestZoneIds = newZoneIds
addedZoneIds = newZoneIds - origZoneIds
removedZoneIds = origZoneIds - newZoneIds
for zoneId in addedZoneIds:
self.zonesToClients.setdefault(zoneId, set()).add(client)
# The client is opening interest in this zone. Need to get
# all of the data from clients who may have objects in
# this zone
datagram = NetDatagram() datagram = NetDatagram()
datagram.addUint16(CLIENT_REQUEST_GENERATES) datagram.addUint16(REQUEST_GENERATES_CMU)
datagram.addUint32(ZoneID) datagram.addUint32(zoneId)
self.sendToAll(datagram) self.sendToZoneExcept(zoneId, datagram, [client])
print "SENDING REQUEST GENERATES (", ZoneID, ") TO ALL"
# client has moved zones, need to update them datagram = PyDatagram()
def handleRemoveZone(self, dgi, connection): datagram.addUint16(OBJECT_DISABLE_CMU)
while dgi.getRemainingSize() > 0: for zoneId in removedZoneIds:
ZoneID = dgi.getUint32() self.zonesToClients[zoneId].remove(client)
if self.ClientZones[connection].count(ZoneID) == 1:
self.ClientZones[connection].remove(ZoneID) # The client is abandoning interest in this zone. Any
if ZoneID in self.ZonesToClients: # objects in this zone should be disabled for the client.
if self.ZonesToClients[ZoneID].count(connection) == 1: for object in self.objectsByZoneId.get(zoneId, []):
self.ZonesToClients[ZoneID].remove(connection) datagram.addUint32(object.doId)
for i in self.ZonetoDOIDs[ZoneID]: self.cw.send(datagram, client.connection)
datagram = NetDatagram()
datagram.addUint16(CLIENT_OBJECT_DELETE) self.needsFlush.add(client)
datagram.addUint32(i)
self.cw.send(datagram, connection)
# client did not tell us he was leaving but we lost connection to him, so we need to update our data and tell others
def clientHardDisconnectTask(self, task): def clientHardDisconnectTask(self, task):
for i in self.ClientIP.keys(): """ client did not tell us he was leaving but we lost connection to
if not self.qcr.isConnectionOk(i): him, so we need to update our data and tell others """
self.handleClientDisconnect(i) for client in self.clientsByConnection.values():
if not self.qcr.isConnectionOk(client.connection):
self.handleClientDisconnect(client)
return Task.cont return Task.cont
# sends a message to everyone who is in the zone def sendToZoneExcept(self, zoneId, datagram, exceptionList):
def sendToZoneExcept(self, ZoneID, datagram, connection): """sends a message to everyone who has interest in the
if ZoneID in self.ZonesToClients: indicated zone, except for the clients on exceptionList."""
for conn in self.ZonesToClients[ZoneID]:
if (conn != connection):
self.cw.send(datagram, conn)
# sends a message to all connected clients if self.notify.getDebug():
def sendToAll(self, datagram): self.notify.debug(
for client in self.ClientIP.keys(): "ServerRepository sending to all in zone %s except %s:" % (zoneId, map(lambda c: c.doIdBase, exceptionList)))
self.cw.send(datagram, client) datagram.dumpHex(ostream)
for client in self.zonesToClients.get(zoneId, []):
if client not in exceptionList:
if self.notify.getDebug():
self.notify.debug(
" -> %s" % (client.doIdBase))
self.cw.send(datagram, client.connection)
self.needsFlush.add(client)
def sendToAllExcept(self, datagram, exceptionList):
""" sends a message to all connected clients, except for
clients on exceptionList. """
if self.notify.getDebug():
self.notify.debug(
"ServerRepository sending to all except %s:" % (map(lambda c: c.doIdBase, exceptionList),))
datagram.dumpHex(ostream)
for client in self.clientsByConnection.values():
if client not in exceptionList:
if self.notify.getDebug():
self.notify.debug(
" -> %s" % (client.doIdBase))
self.cw.send(datagram, client.connection)
self.needsFlush.add(client)

View File

@ -0,0 +1,183 @@
from direct.showbase.DirectObject import *
from pandac.PandaModules import *
from direct.task import Task
from direct.distributed import DistributedObject
from direct.directnotify import DirectNotifyGlobal
from direct.distributed.ClockDelta import globalClockDelta
class TimeManager(DistributedObject.DistributedObject):
"""
This DistributedObject lives on the AI and on the client side, and
serves to synchronize the time between them so they both agree, to
within a few hundred milliseconds at least, what time it is.
It uses a pull model where the client can request a
synchronization check from time to time. It also employs a
round-trip measurement to minimize the effect of latency.
"""
notify = DirectNotifyGlobal.directNotify.newCategory("TimeManager")
# The number of seconds to wait between automatic
# synchronizations. Set to 0 to disable auto sync after
# startup.
updateFreq = base.config.GetFloat('time-manager-freq', 1800)
# The minimum number of seconds to wait between two unrelated
# synchronization attempts. Increasing this number cuts down
# on frivolous synchronizations.
minWait = base.config.GetFloat('time-manager-min-wait', 10)
# The maximum number of seconds of uncertainty to tolerate in
# the clock delta without trying again.
maxUncertainty = base.config.GetFloat('time-manager-max-uncertainty', 1)
# The maximum number of attempts to try to get a low-latency
# time measurement before giving up and accepting whatever we
# get.
maxAttempts = base.config.GetInt('time-manager-max-attempts', 5)
# A simulated clock skew for debugging, in seconds.
extraSkew = base.config.GetInt('time-manager-extra-skew', 0)
if extraSkew != 0:
notify.info("Simulating clock skew of %0.3f s" % extraSkew)
reportFrameRateInterval = base.config.GetDouble('report-frame-rate-interval', 300.0)
def __init__(self, cr):
DistributedObject.DistributedObject.__init__(self, cr)
self.thisContext = -1
self.nextContext = 0
self.attemptCount = 0
self.start = 0
self.lastAttempt = -self.minWait*2
### DistributedObject methods ###
def generate(self):
"""
This method is called when the DistributedObject is reintroduced
to the world, either for the first time or from the cache.
"""
DistributedObject.DistributedObject.generate(self)
self.accept('clock_error', self.handleClockError)
if self.updateFreq > 0:
self.startTask()
def announceGenerate(self):
DistributedObject.DistributedObject.announceGenerate(self)
self.synchronize("TimeManager.announceGenerate")
def disable(self):
"""
This method is called when the DistributedObject is removed from
active duty and stored in a cache.
"""
self.ignore('clock_error')
self.stopTask()
taskMgr.remove('frameRateMonitor')
DistributedObject.DistributedObject.disable(self)
def delete(self):
"""
This method is called when the DistributedObject is permanently
removed from the world and deleted from the cache.
"""
DistributedObject.DistributedObject.delete(self)
### Task management methods ###
def startTask(self):
self.stopTask()
taskMgr.doMethodLater(self.updateFreq, self.doUpdate, "timeMgrTask")
def stopTask(self):
taskMgr.remove("timeMgrTask")
def doUpdate(self, task):
self.synchronize("timer")
# Spawn the next one
taskMgr.doMethodLater(self.updateFreq, self.doUpdate, "timeMgrTask")
return Task.done
### Automatic clock error handling ###
def handleClockError(self):
self.synchronize("clock error")
### Synchronization methods ###
def synchronize(self, description):
"""synchronize(self, string description)
Call this function from time to time to synchronize watches
with the server. This initiates a round-trip transaction;
when the transaction completes, the time will be synced.
The description is the string that will be written to the log
file regarding the reason for this synchronization attempt.
The return value is true if the attempt is made, or false if
it is too soon since the last attempt.
"""
now = globalClock.getRealTime()
if now - self.lastAttempt < self.minWait:
self.notify.debug("Not resyncing (too soon): %s" % (description))
return 0
self.talkResult = 0
self.thisContext = self.nextContext
self.attemptCount = 0
self.nextContext = (self.nextContext + 1) & 255
self.notify.info("Clock sync: %s" % (description))
self.start = now
self.lastAttempt = now
self.sendUpdate("requestServerTime", [self.thisContext])
return 1
def serverTime(self, context, timestamp):
"""serverTime(self, int8 context, int32 timestamp)
This message is sent from the AI to the client in response to
a previous requestServerTime. It contains the time as
observed by the AI.
The client should use this, in conjunction with the time
measurement taken before calling requestServerTime (above), to
determine the clock delta between the AI and the client
machines.
"""
end = globalClock.getRealTime()
if context != self.thisContext:
self.notify.info("Ignoring TimeManager response for old context %d" % (context))
return
elapsed = end - self.start
self.attemptCount += 1
self.notify.info("Clock sync roundtrip took %0.3f ms" % (elapsed * 1000.0))
average = (self.start + end) / 2.0 - self.extraSkew
uncertainty = (end - self.start) / 2.0 + abs(self.extraSkew)
globalClockDelta.resynchronize(average, timestamp, uncertainty)
self.notify.info("Local clock uncertainty +/- %.3f s" % (globalClockDelta.getUncertainty()))
if globalClockDelta.getUncertainty() > self.maxUncertainty:
if self.attemptCount < self.maxAttempts:
self.notify.info("Uncertainty is too high, trying again.")
self.start = globalClock.getRealTime()
self.sendUpdate("requestServerTime", [self.thisContext])
return
self.notify.info("Giving up on uncertainty requirement.")
messenger.send("gotTimeSync")

View File

@ -0,0 +1,23 @@
from direct.distributed.ClockDelta import *
from pandac.PandaModules import *
from direct.distributed import DistributedObjectAI
class TimeManagerAI(DistributedObjectAI.DistributedObjectAI):
notify = DirectNotifyGlobal.directNotify.newCategory("TimeManagerAI")
def __init__(self, air):
DistributedObjectAI.DistributedObjectAI.__init__(self, air)
def requestServerTime(self, context):
"""requestServerTime(self, int8 context)
This message is sent from the client to the AI to initiate a
synchronization phase. The AI should immediately report back
with its current time. The client will then measure the round
trip.
"""
timestamp = globalClockDelta.getRealNetworkTime(bits=32)
requesterId = self.air.getAvatarIdFromSender()
print "requestServerTime from %s" % (requesterId)
self.sendUpdateToAvatarId(requesterId, "serverTime",
[context, timestamp])

View File

@ -84,6 +84,33 @@ get_client_datagram() const {
return _client_datagram; return _client_datagram;
} }
////////////////////////////////////////////////////////////////////
// Function: CConnectionRepository::set_handle_datagrams_internally
// Access: Published
// Description: Sets the handle_datagrams_internally flag. When
// true, certain message types can be handled by the C++
// code in in this module. When false, all datagrams,
// regardless of message type, are passed up to Python
// for processing.
//
// The CMU distributed-object implementation requires
// this to be set false.
////////////////////////////////////////////////////////////////////
INLINE void CConnectionRepository::
set_handle_datagrams_internally(bool handle_datagrams_internally) {
_handle_datagrams_internally = handle_datagrams_internally;
}
////////////////////////////////////////////////////////////////////
// Function: CConnectionRepository::get_handle_datagrams_internally
// Access: Published
// Description: Returns the handle_datagrams_internally flag.
////////////////////////////////////////////////////////////////////
INLINE bool CConnectionRepository::
get_handle_datagrams_internally() const {
return _handle_datagrams_internally;
}
#ifdef HAVE_PYTHON #ifdef HAVE_PYTHON
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: CConnectionRepository::set_python_repository // Function: CConnectionRepository::set_python_repository
@ -184,7 +211,7 @@ get_datagram_iterator(DatagramIterator &di) {
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: CConnectionRepository::get_msg_channel // Function: CConnectionRepository::get_msg_channel
// Access: Published // Access: Published
// Description: Returns the channel from which the current message // Description: Returns the channel(s) to which the current message
// was sent, according to the datagram headers. This // was sent, according to the datagram headers. This
// information is not available to the client. // information is not available to the client.
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////

View File

@ -62,6 +62,7 @@ CConnectionRepository(bool has_owner_view) :
_native(false), _native(false),
#endif #endif
_client_datagram(true), _client_datagram(true),
_handle_datagrams_internally(handle_datagrams_internally),
_simulated_disconnect(false), _simulated_disconnect(false),
_verbose(distributed_cat.is_spam()), _verbose(distributed_cat.is_spam()),
// _msg_channels(), // _msg_channels(),
@ -275,13 +276,11 @@ check_datagram() {
// Start breaking apart the datagram. // Start breaking apart the datagram.
_di = DatagramIterator(_dg); _di = DatagramIterator(_dg);
if (!_client_datagram) if (!_client_datagram) {
{
unsigned char wc_cnt; unsigned char wc_cnt;
wc_cnt = _di.get_uint8(); wc_cnt = _di.get_uint8();
_msg_channels.clear(); _msg_channels.clear();
for(unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) for (unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) {
{
CHANNEL_TYPE schan = _di.get_uint64(); CHANNEL_TYPE schan = _di.get_uint64();
_msg_channels.push_back(schan); _msg_channels.push_back(schan);
} }
@ -301,6 +300,10 @@ check_datagram() {
_msg_type = _di.get_uint16(); _msg_type = _di.get_uint16();
// Is this a message that we can process directly? // Is this a message that we can process directly?
if (!_handle_datagrams_internally) {
return true;
}
switch (_msg_type) { switch (_msg_type) {
#ifdef HAVE_PYTHON #ifdef HAVE_PYTHON
case CLIENT_OBJECT_UPDATE_FIELD: case CLIENT_OBJECT_UPDATE_FIELD:

View File

@ -73,6 +73,9 @@ PUBLISHED:
INLINE void set_client_datagram(bool client_datagram); INLINE void set_client_datagram(bool client_datagram);
INLINE bool get_client_datagram() const; INLINE bool get_client_datagram() const;
INLINE void set_handle_datagrams_internally(bool handle_datagrams_internally);
INLINE bool get_handle_datagrams_internally() const;
#ifdef HAVE_PYTHON #ifdef HAVE_PYTHON
INLINE void set_python_repository(PyObject *python_repository); INLINE void set_python_repository(PyObject *python_repository);
#endif #endif
@ -185,6 +188,7 @@ private:
bool _has_owner_view; bool _has_owner_view;
bool _handle_c_updates; bool _handle_c_updates;
bool _client_datagram; bool _client_datagram;
bool _handle_datagrams_internally;
bool _simulated_disconnect; bool _simulated_disconnect;
bool _verbose; bool _verbose;

View File

@ -40,6 +40,13 @@ ConfigVariableDouble max_lag
"inbound messages. It is useful to test a game's tolerance of " "inbound messages. It is useful to test a game's tolerance of "
"network latency.")); "network latency."));
ConfigVariableBool handle_datagrams_internally
("handle-datagrams-internally", true,
PRC_DESC("When this is true, certain datagram types can be handled "
"directly by the C++ cConnectionRepository implementation, "
"for performance reasons. When it is false, all datagrams "
"are handled by the Python implementation."));
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
// Function: init_libdistributed // Function: init_libdistributed
// Description: Initializes the library. This must be called at // Description: Initializes the library. This must be called at

View File

@ -20,12 +20,14 @@
#include "dconfig.h" #include "dconfig.h"
#include "configVariableInt.h" #include "configVariableInt.h"
#include "configVariableDouble.h" #include "configVariableDouble.h"
#include "configVariableBool.h"
NotifyCategoryDecl(distributed, EXPCL_DIRECT, EXPTP_DIRECT); NotifyCategoryDecl(distributed, EXPCL_DIRECT, EXPTP_DIRECT);
extern ConfigVariableInt game_server_timeout_ms; extern ConfigVariableInt game_server_timeout_ms;
extern ConfigVariableDouble min_lag; extern ConfigVariableDouble min_lag;
extern ConfigVariableDouble max_lag; extern ConfigVariableDouble max_lag;
extern ConfigVariableBool handle_datagrams_internally;
extern EXPCL_DIRECT void init_libdistributed(); extern EXPCL_DIRECT void init_libdistributed();

View File

@ -0,0 +1,90 @@
# This is a sample dc file for some of the classes defined within the
# direct source tree. It is suggested that you copy this file into
# your own project (or load it from the direct source tree) and build
# on it with your own dc file for your own classes.
keyword broadcast;
keyword ram;
keyword p2p;
from direct.distributed import DistributedObject/AI
from direct.distributed import TimeManager/AI
from direct.distributed import DistributedNode/AI
from direct.distributed import DistributedSmoothNode/AI
struct BarrierData {
uint16 context;
string name;
uint32 avIds[];
};
// The most fundamental class
dclass DistributedObject {
// These are used to support DistributedObjectAI.beginBarrier() and
// the matching DistributedObject.doneBarrier(). If you don't call
// these functions, you don't care about these distributed methods.
// (Actually, you probably don't care anyway.)
setBarrierData(BarrierData data[]) broadcast ram;
setBarrierReady(uint16 context);
};
dclass TimeManager: DistributedObject {
requestServerTime(uint8 context) p2p;
serverTime(uint8 context, int32 timestamp);
};
dclass DistributedNode: DistributedObject {
setX(int16 / 10) broadcast ram;
setY(int16 / 10) broadcast ram;
setZ(int16 / 10) broadcast ram;
setH(int16 % 360 / 10) broadcast ram;
setP(int16 % 360 / 10) broadcast ram;
setR(int16 % 360 / 10) broadcast ram;
setPos: setX, setY, setZ;
setHpr: setH, setP, setR;
setPosHpr: setX, setY, setZ, setH, setP, setR;
setXY: setX, setY;
setXZ: setX, setZ;
setXYH: setX, setY, setH;
setXYZH: setX, setY, setZ, setH;
};
dclass DistributedSmoothNode: DistributedNode {
// Component set pos and hpr functions.
setComponentL(uint64) broadcast ram;
setComponentX(int16 / 10) broadcast ram;
setComponentY(int16 / 10) broadcast ram;
setComponentZ(int16 / 10) broadcast ram;
setComponentH(int16 % 360 / 10) broadcast ram;
setComponentP(int16 % 360 / 10) broadcast ram;
setComponentR(int16 % 360 / 10) broadcast ram;
setComponentT(int16 timestamp) broadcast ram;
// Composite set pos and hpr functions. These map to combinations
// of one or more of the above components. They all include
// setComponentT(), which must be called last.
setSmStop: setComponentT;
setSmH: setComponentH, setComponentT;
setSmZ: setComponentZ, setComponentT;
setSmXY: setComponentX, setComponentY, setComponentT;
setSmXZ: setComponentX, setComponentZ, setComponentT;
setSmPos: setComponentX, setComponentY, setComponentZ, setComponentT;
setSmHpr: setComponentH, setComponentP, setComponentR, setComponentT;
setSmXYH: setComponentX, setComponentY, setComponentH, setComponentT;
setSmXYZH: setComponentX, setComponentY, setComponentZ, setComponentH, setComponentT;
setSmPosHpr: setComponentX, setComponentY, setComponentZ, setComponentH, setComponentP, setComponentR, setComponentT;
// special update if L (being location, such as zoneId) changes, send everything, intended to
// keep position and 'location' in sync
setSmPosHprL: setComponentL, setComponentX, setComponentY, setComponentZ, setComponentH, setComponentP, setComponentR, setComponentT;
clearSmoothing(int8 bogus) broadcast;
suggestResync(uint32 avId, int16 timestampA, int16 timestampB,
int32 serverTimeSec, uint16 serverTimeUSec,
uint16 / 100 uncertainty);
returnResync(uint32 avId, int16 timestampB,
int32 serverTimeSec, uint16 serverTimeUSec,
uint16 / 100 uncertainty);
};