diff --git a/direct/src/dcparser/dcClass.cxx b/direct/src/dcparser/dcClass.cxx index 92102e133a..01ca338d7b 100644 --- a/direct/src/dcparser/dcClass.cxx +++ b/direct/src/dcparser/dcClass.cxx @@ -949,7 +949,7 @@ ai_format_update_msg_type(const string &field_name, DOID_TYPE do_id, #ifdef HAVE_PYTHON //////////////////////////////////////////////////////////////////// -// Function: DCClass::client_format_generate +// Function: DCClass::client_format_generate_CMU // Access: Published // Description: Generates a datagram containing the message necessary // to generate a new distributed object from the client. @@ -958,21 +958,16 @@ ai_format_update_msg_type(const string &field_name, DOID_TYPE do_id, // // optional_fields is a list of fieldNames to generate // in addition to the normal required fields. +// +// This method is only called by the CMU implementation. //////////////////////////////////////////////////////////////////// Datagram DCClass:: -client_format_generate(PyObject *distobj, DOID_TYPE do_id, - ZONEID_TYPE zone_id, PyObject *optional_fields) const { +client_format_generate_CMU(PyObject *distobj, DOID_TYPE do_id, + ZONEID_TYPE zone_id, + PyObject *optional_fields) const { DCPacker packer; - //packer.raw_pack_uint8('A'); - - bool has_optional_fields = (PyObject_IsTrue(optional_fields) != 0); - - if (has_optional_fields) { - packer.raw_pack_uint16(CLIENT_CREATE_OBJECT_REQUIRED_OTHER); - } else { - packer.raw_pack_uint16(CLIENT_CREATE_OBJECT_REQUIRED); - } + packer.raw_pack_uint16(CLIENT_OBJECT_GENERATE_CMU); packer.raw_pack_uint32(zone_id); packer.raw_pack_uint16(_number); @@ -992,30 +987,31 @@ client_format_generate(PyObject *distobj, DOID_TYPE do_id, } // Also specify the optional fields. - if (has_optional_fields) { - int num_optional_fields = PySequence_Size(optional_fields); - packer.raw_pack_uint16(num_optional_fields); + int num_optional_fields = 0; + if (PyObject_IsTrue(optional_fields)) { + num_optional_fields = PySequence_Size(optional_fields); + } + packer.raw_pack_uint16(num_optional_fields); - for (int i = 0; i < num_optional_fields; i++) { - PyObject *py_field_name = PySequence_GetItem(optional_fields, i); - string field_name = PyString_AsString(py_field_name); - Py_XDECREF(py_field_name); - - DCField *field = get_field_by_name(field_name); - if (field == (DCField *)NULL) { - ostringstream strm; - strm << "No field named " << field_name << " in class " << get_name() - << "\n"; - nassert_raise(strm.str()); - return Datagram(); - } - packer.raw_pack_uint16(field->get_number()); - packer.begin_pack(field); - if (!pack_required_field(packer, distobj, field)) { - return Datagram(); - } - packer.end_pack(); + for (int i = 0; i < num_optional_fields; i++) { + PyObject *py_field_name = PySequence_GetItem(optional_fields, i); + string field_name = PyString_AsString(py_field_name); + Py_XDECREF(py_field_name); + + DCField *field = get_field_by_name(field_name); + if (field == (DCField *)NULL) { + ostringstream strm; + strm << "No field named " << field_name << " in class " << get_name() + << "\n"; + nassert_raise(strm.str()); + return Datagram(); } + packer.raw_pack_uint16(field->get_number()); + packer.begin_pack(field); + if (!pack_required_field(packer, distobj, field)) { + return Datagram(); + } + packer.end_pack(); } return Datagram(packer.get_data(), packer.get_length()); diff --git a/direct/src/dcparser/dcClass.h b/direct/src/dcparser/dcClass.h index f282eb0430..8d2558a4d7 100644 --- a/direct/src/dcparser/dcClass.h +++ b/direct/src/dcparser/dcClass.h @@ -117,8 +117,8 @@ PUBLISHED: Datagram ai_format_generate(PyObject *distobj, DOID_TYPE do_id, ZONEID_TYPE parent_id, ZONEID_TYPE zone_id, CHANNEL_TYPE district_channel_id, CHANNEL_TYPE from_channel_id, PyObject *optional_fields) const; - Datagram client_format_generate(PyObject *distobj, DOID_TYPE do_id, ZONEID_TYPE zone_id, - PyObject *optional_fields) const; + Datagram client_format_generate_CMU(PyObject *distobj, DOID_TYPE do_id, + ZONEID_TYPE zone_id, PyObject *optional_fields) const; Datagram ai_database_generate_context(unsigned int context_id, DOID_TYPE parent_id, ZONEID_TYPE zone_id, CHANNEL_TYPE owner_channel, CHANNEL_TYPE database_server_id, CHANNEL_TYPE from_channel_id) const; diff --git a/direct/src/dcparser/dcmsgtypes.h b/direct/src/dcparser/dcmsgtypes.h index fc6b392dc5..0911a03585 100755 --- a/direct/src/dcparser/dcmsgtypes.h +++ b/direct/src/dcparser/dcmsgtypes.h @@ -26,9 +26,11 @@ #define STATESERVER_OBJECT_GENERATE_WITH_REQUIRED 2001 #define STATESERVER_OBJECT_GENERATE_WITH_REQUIRED_OTHER 2003 #define STATESERVER_OBJECT_UPDATE_FIELD 2004 -#define STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT 2050 +#define STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT 2050 #define STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT 2051 -#define STATESERVER_BOUNCE_MESSAGE 2086 +#define STATESERVER_BOUNCE_MESSAGE 2086 + +#define CLIENT_OBJECT_GENERATE_CMU 9002 #endif diff --git a/direct/src/distributed/ClientRepository.py b/direct/src/distributed/ClientRepository.py index f8a7d4e902..716bb36bc0 100644 --- a/direct/src/distributed/ClientRepository.py +++ b/direct/src/distributed/ClientRepository.py @@ -1,6 +1,12 @@ """ClientRepository module: contains the ClientRepository class""" -from ClientRepositoryBase import * +from ClientRepositoryBase import ClientRepositoryBase +from direct.directnotify import DirectNotifyGlobal +from MsgTypesCMU import * +from PyDatagram import PyDatagram +from PyDatagramIterator import PyDatagramIterator +from pandac.PandaModules import UniqueIdAllocator +import types class ClientRepository(ClientRepositoryBase): """ @@ -12,117 +18,249 @@ class ClientRepository(ClientRepositoryBase): """ notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository") - def __init__(self, dcFileNames = None): - ClientRepositoryBase.__init__(self, dcFileNames = dcFileNames) + # This is required by DoCollectionManager, even though it's not + # used by this implementation. + GameGlobalsId = 0 - # The DOID allocator. The CMU LAN server may choose to - # send us a block of DOIDs. If it chooses to do so, then we - # may create objects, using those DOIDs. - self.DOIDbase = 0 - self.DOIDnext = 0 - self.DOIDlast = 0 + def __init__(self, dcFileNames = None, dcSuffix = ''): + ClientRepositoryBase.__init__(self, dcFileNames = dcFileNames, dcSuffix = dcSuffix) + self.setHandleDatagramsInternally(False) - def handleSetDOIDrange(self, di): - self.DOIDbase = di.getUint32() - self.DOIDlast = self.DOIDbase + di.getUint32() - self.DOIDnext = self.DOIDbase + # The doId allocator. The CMU LAN server may choose to + # send us a block of doIds. If it chooses to do so, then we + # may create objects, using those doIds. + self.doIdAllocator = None + self.doIdBase = 0 + self.doIdLast = 0 + + # The doIdBase of the client message currently being + # processed. + self.currentSenderId = None + + def handleSetDoIdrange(self, di): + self.doIdBase = di.getUint32() + self.doIdLast = self.doIdBase + di.getUint32() + self.doIdAllocator = UniqueIdAllocator(self.doIdBase, self.doIdLast - 1) + + # Now that we've got a doId range, we can safely generate new + # distributed objects. + messenger.send('createReady') def handleRequestGenerates(self, di): # When new clients join the zone of an object, they need to hear # about it, so we send out all of our information about objects in # that particular zone. - assert self.DOIDnext < self.DOIDlast zone = di.getUint32() for obj in self.doId2do.values(): - if obj.zone == zone: - id = obj.doId - if (self.isLocalId(id)): - self.send(obj.dclass.clientFormatGenerate(obj, id, zone, [])) + if obj.zoneId == zone: + if (self.isLocalId(obj.doId)): + self.resendGenerate(obj) - def createWithRequired(self, className, zoneId = 0, optionalFields=None): - if self.DOIDnext >= self.DOIDlast: - self.notify.error( - "Cannot allocate a distributed object ID: all IDs used up.") - return None - id = self.DOIDnext - self.DOIDnext = self.DOIDnext + 1 - dclass = self.dclassesByName[className] + def resendGenerate(self, obj): + """ Sends the generate message again for an already-generated + object, presumably to inform any newly-arrived clients of this + object's current state. """ + + # get the list of "ram" fields that aren't + # required. These are fields whose values should + # persist even if they haven't been received + # lately, so we have to re-broadcast these values + # in case the new client hasn't heard their latest + # values. + extraFields = [] + for i in range(obj.dclass.getNumInheritedFields()): + field = obj.dclass.getInheritedField(i) + if field.hasKeyword('broadcast') and field.hasKeyword('ram') and not field.hasKeyword('required'): + if field.asMolecularField(): + # It's a molecular field; this means + # we have to pack the components. + # Fortunately, we'll find those + # separately through the iteration, so + # we can ignore this field itself. + continue + + extraFields.append(field.getName()) + + datagram = self.formatGenerate(obj, extraFields) + self.send(datagram) + + def handleGenerate(self, di): + self.currentSenderId = di.getUint32() + zoneId = di.getUint32() + classId = di.getUint16() + doId = di.getUint32() + + # Look up the dclass + dclass = self.dclassesByNumber[classId] + + distObj = self.doId2do.get(doId) + if distObj and distObj.dclass == dclass: + # We've already got this object. Probably this is just a + # repeat-generate, synthesized for the benefit of someone + # else who just entered the zone. Accept the new updates, + # but don't make a formal generate. + assert(self.notify.debug("performing generate-update for %s %s" % (dclass.getName(), doId))) + dclass.receiveUpdateBroadcastRequired(distObj, di) + dclass.receiveUpdateOther(distObj, di) + return + + assert(self.notify.debug("performing generate for %s %s" % (dclass.getName(), doId))) + dclass.startGenerate() + # Create a new distributed object, and put it in the dictionary + distObj = self.generateWithRequiredOtherFields(dclass, doId, di, 0, zoneId) + dclass.stopGenerate() + + def allocateDoId(self): + """ Returns a newly-allocated doId. Call freeDoId() when the + object has been deleted. """ + + return self.doIdAllocator.allocate() + + def freeDoId(self, doId): + """ Returns a doId back into the free pool for re-use. """ + + assert self.isLocalId(doId) + self.doIdAllocator.free(doId) + + def createDistributedObject(self, className = None, distObj = None, + zoneId = 0, optionalFields = None): + + """ To create a DistributedObject, you must pass in either the + name of the object's class, or an already-created instance of + the class (or both). If you pass in just a class name (to the + className parameter), then a default instance of the object + will be created, with whatever parameters the default + constructor supplies. Alternatively, if you wish to create + some initial values different from the default, you can create + the instance yourself and supply it to the distObj parameter, + then that instance will be used instead. (It should be a + newly-created object, not one that has already been manifested + on the network or previously passed through + createDistributedObject.) In either case, the new + DistributedObject is returned from this method. + + This method will issue the appropriate network commands to + make this object appear on all of the other clients. + + You should supply an initial zoneId in which to manifest the + object. The fields marked "required" or "ram" will be + broadcast to all of the other clients; if you wish to + broadcast additional field values at this time as well, pass a + list of field names in the optionalFields parameters. + """ + + if not className: + if not distObj: + self.notify.error("Must specify either a className or a distObj.") + className = distObj.__class__.__name__ + + doId = self.allocateDoId() + dclass = self.dclassesByName.get(className) + if not dclass: + self.notify.error("Unknown distributed class: %s" % (distObj.__class__)) classDef = dclass.getClassDef() if classDef == None: self.notify.error("Could not create an undefined %s object." % ( dclass.getName())) - obj = classDef(self) - obj.dclass = dclass - obj.zone = zoneId - obj.doId = id - self.doId2do[id] = obj - obj.generateInit() - obj._retrieveCachedData() - obj.generate() - obj.announceGenerate() - datagram = dclass.clientFormatGenerate(obj, id, zoneId, optionalFields) - self.send(datagram) - return obj - def sendDisableMsg(self, doId): - datagram = PyDatagram() - datagram.addUint16(CLIENT_OBJECT_DISABLE) - datagram.addUint32(doId) + if not distObj: + distObj = classDef(self) + if not isinstance(distObj, classDef): + self.notify.error("Object %s is not an instance of %s" % (distObj.__class__.__name__, classDef.__name__)) + + distObj.dclass = dclass + distObj.doId = doId + self.doId2do[doId] = distObj + distObj.generateInit() + distObj._retrieveCachedData() + distObj.generate() + distObj.setLocation(0, zoneId) + distObj.announceGenerate() + datagram = self.formatGenerate(distObj, optionalFields) self.send(datagram) + return distObj + + def formatGenerate(self, distObj, extraFields): + """ Returns a datagram formatted for sending the generate message for the indicated object. """ + return distObj.dclass.clientFormatGenerateCMU(distObj, distObj.doId, distObj.zoneId, extraFields) def sendDeleteMsg(self, doId): datagram = PyDatagram() - datagram.addUint16(CLIENT_OBJECT_DELETE) + datagram.addUint16(OBJECT_DELETE_CMU) datagram.addUint32(doId) self.send(datagram) - def sendRemoveZoneMsg(self, zoneId, visibleZoneList=None): - datagram = PyDatagram() - datagram.addUint16(CLIENT_REMOVE_ZONE) - datagram.addUint32(zoneId) + def sendDisconnect(self): + if self.isConnected(): + # Tell the game server that we're going: + datagram = PyDatagram() + # Add message type + datagram.addUint16(CLIENT_DISCONNECT_CMU) + # Send the message + self.send(datagram) + self.notify.info("Sent disconnect message to server") + self.disconnect() + self.stopHeartbeat() - # if we have an explicit list of visible zones, add them - if visibleZoneList is not None: - vzl = list(visibleZoneList) - vzl.sort() - assert PythonUtil.uniqueElements(vzl) - for zone in vzl: - datagram.addUint32(zone) - - # send the message - self.send(datagram) - - def sendUpdateZone(self, obj, zoneId): - id = obj.doId - assert self.isLocalId(id) - self.sendDeleteMsg(id, 1) - obj.zone = zoneId - self.send(obj.dclass.clientFormatGenerate(obj, id, zoneId, [])) - - def sendSetZoneMsg(self, zoneId, visibleZoneList=None): + def setInterestZones(self, interestZoneIds): + """ Changes the set of zones that this particular client is + interested in hearing about. """ + datagram = PyDatagram() # Add message type - datagram.addUint16(CLIENT_SET_ZONE_CMU) - # Add zone id - datagram.addUint32(zoneId) + datagram.addUint16(CLIENT_SET_INTEREST_CMU) - # if we have an explicit list of visible zones, add them - if visibleZoneList is not None: - vzl = list(visibleZoneList) - vzl.sort() - assert PythonUtil.uniqueElements(vzl) - for zone in vzl: - datagram.addUint32(zone) + for zoneId in interestZoneIds: + datagram.addUint32(zoneId) # send the message self.send(datagram) - def isLocalId(self, id): - return ((id >= self.DOIDbase) and (id < self.DOIDlast)) + def setObjectZone(self, distObj, zoneId): + """ Moves the object into the indicated zone. """ + distObj.b_setLocation(0, zoneId) + assert distObj.zoneId == zoneId + + # Tell all of the clients monitoring the new zone that we've + # arrived. + self.resendGenerate(distObj) + + def sendSetLocation(self, doId, parentId, zoneId): + datagram = PyDatagram() + datagram.addUint16(OBJECT_SET_ZONE_CMU) + datagram.addUint32(doId) + datagram.addUint32(zoneId) + self.send(datagram) + + def sendHeartbeat(self): + datagram = PyDatagram() + # Add message type + datagram.addUint16(CLIENT_HEARTBEAT_CMU) + # Send it! + self.send(datagram) + self.lastHeartbeat = globalClock.getRealTime() + # This is important enough to consider flushing immediately + # (particularly if we haven't run readerPollTask recently). + self.considerFlush() + + def isLocalId(self, doId): + """ Returns true if this doId is one that we're the owner of, + false otherwise. """ + + return ((doId >= self.doIdBase) and (doId < self.doIdLast)) def haveCreateAuthority(self): - return (self.DOIDlast > self.DOIDnext) + """ Returns true if this client has been assigned a range of + doId's it may use to create objects, false otherwise. """ + + return (self.doIdLast > self.doIdBase) + + def getAvatarIdFromSender(self): + """ Returns the doIdBase of the client that originally sent + the current update message. This is only defined when + processing an update message or a generate message. """ + return self.currentSenderId def handleDatagram(self, di): if self.notify.getDebug(): @@ -130,23 +268,22 @@ class ClientRepository(ClientRepositoryBase): di.getDatagram().dumpHex(ostream) msgType = self.getMsgType() + self.currentSenderId = None # These are the sort of messages we may expect from the public # Panda server. - if msgType == CLIENT_SET_DOID_RANGE: - self.handleSetDOIDrange(di) - elif msgType == CLIENT_CREATE_OBJECT_REQUIRED_RESP: - self.handleGenerateWithRequired(di) - elif msgType == CLIENT_CREATE_OBJECT_REQUIRED_OTHER_RESP: - self.handleGenerateWithRequiredOther(di) - elif msgType == CLIENT_OBJECT_UPDATE_FIELD_RESP: + if msgType == SET_DOID_RANGE_CMU: + self.handleSetDoIdrange(di) + elif msgType == OBJECT_GENERATE_CMU: + self.handleGenerate(di) + elif msgType == OBJECT_UPDATE_FIELD_CMU: self.handleUpdateField(di) - elif msgType == CLIENT_OBJECT_DELETE_RESP: - self.handleDelete(di) - elif msgType == CLIENT_OBJECT_DISABLE_RESP: + elif msgType == OBJECT_DISABLE_CMU: self.handleDisable(di) - elif msgType == CLIENT_REQUEST_GENERATES: + elif msgType == OBJECT_DELETE_CMU: + self.handleDelete(di) + elif msgType == REQUEST_GENERATES_CMU: self.handleRequestGenerates(di) else: self.handleMessageType(msgType, di) @@ -156,57 +293,90 @@ class ClientRepository(ClientRepositoryBase): self.considerHeartbeat() def handleMessageType(self, msgType, di): - self.notify.error("unrecognized message") + self.notify.error("unrecognized message type %s" % (msgType)) - def handleGenerateWithRequired(self, di): - # Get the class Id - classId = di.getUint16() - # Get the DO Id + def handleUpdateField(self, di): + # The CMU update message starts with an additional field, not + # present in the Disney update message: the doIdBase of the + # original sender. Extract that and call up to the parent. + self.currentSenderId = di.getUint32() + ClientRepositoryBase.handleUpdateField(self, di) + + def handleDisable(self, di): + # Receives a list of doIds. + while di.getRemainingSize() > 0: + doId = di.getUint32() + + # We should never get a disable message for our own object. + assert not self.isLocalId(doId) + self.disableDoId(doId) + + def handleDelete(self, di): + # Receives a single doId. doId = di.getUint32() - # Look up the dclass - dclass = self.dclassesByNumber[classId] - dclass.startGenerate() - # Create a new distributed object, and put it in the dictionary - distObj = self.generateWithRequiredFields(dclass, doId, di) - dclass.stopGenerate() + self.deleteObject(doId) - def generateWithRequiredFields(self, dclass, doId, di): + def deleteObject(self, doId): + """ + Removes the object from the client's view of the world. This + should normally not be called directly except in the case of + error recovery, since the server will normally be responsible + for deleting and disabling objects as they go out of scope. + + After this is called, future updates by server on this object + will be ignored (with a warning message). The object will + become valid again the next time the server sends a generate + message for this doId. + + This is not a distributed message and does not delete the + object on the server or on any other client. + """ if self.doId2do.has_key(doId): - # ...it is in our dictionary. - # Just update it. - distObj = self.doId2do[doId] - assert distObj.dclass == dclass - distObj.generate() - distObj.updateRequiredFields(dclass, di) - # updateRequiredFields calls announceGenerate + # If it is in the dictionary, remove it. + obj = self.doId2do[doId] + # Remove it from the dictionary + del self.doId2do[doId] + # Disable, announce, and delete the object itself... + # unless delayDelete is on... + obj.deleteOrDelay() + if self.isLocalId(doId): + self.freeDoId(doId) elif self.cache.contains(doId): - # ...it is in the cache. - # Pull it out of the cache: - distObj = self.cache.retrieve(doId) - assert distObj.dclass == dclass - # put it in the dictionary: - self.doId2do[doId] = distObj - # and update it. - distObj.generate() - distObj.updateRequiredFields(dclass, di) - # updateRequiredFields calls announceGenerate + # If it is in the cache, remove it. + self.cache.delete(doId) + if self.isLocalId(doId): + self.freeDoId(doId) else: - # ...it is not in the dictionary or the cache. - # Construct a new one - classDef = dclass.getClassDef() - if classDef == None: - self.notify.error("Could not create an undefined %s object." % ( - dclass.getName())) - distObj = classDef(self) - distObj.dclass = dclass - # Assign it an Id - distObj.doId = doId - # Put the new do in the dictionary - self.doId2do[doId] = distObj - # Update the required fields - distObj.generateInit() # Only called when constructed - distObj._retrieveCachedData() - distObj.generate() - distObj.updateRequiredFields(dclass, di) - # updateRequiredFields calls announceGenerate - return distObj + # Otherwise, ignore it + self.notify.warning( + "Asked to delete non-existent DistObj " + str(doId)) + + def sendUpdate(self, distObj, fieldName, args): + """ Sends a normal update for a single field. """ + dg = distObj.dclass.clientFormatUpdate( + fieldName, distObj.doId, args) + self.send(dg) + + def sendUpdateToChannel(self, distObj, channelId, fieldName, args): + + """ Sends a targeted update of a single field to a particular + client. The top 32 bits of channelId is ignored; the lower 32 + bits should be the client Id of the recipient (i.e. the + client's doIdbase). The field update will be sent to the + indicated client only. The field must be marked clsend or + p2p, and may not be marked broadcast. """ + + datagram = distObj.dclass.clientFormatUpdate( + fieldName, distObj.doId, args) + dgi = PyDatagramIterator(datagram) + + # Reformat the packed datagram to change the message type and + # add the target id. + dgi.getUint16() + + dg = PyDatagram() + dg.addUint16(CLIENT_OBJECT_UPDATE_FIELD_TARGETED_CMU) + dg.addUint32(channelId & 0xffffffff) + dg.appendData(dgi.getRemainingBytes()) + + self.send(dg) diff --git a/direct/src/distributed/ClientRepositoryBase.py b/direct/src/distributed/ClientRepositoryBase.py index 381616d6ee..9a887114e2 100644 --- a/direct/src/distributed/ClientRepositoryBase.py +++ b/direct/src/distributed/ClientRepositoryBase.py @@ -25,9 +25,9 @@ class ClientRepositoryBase(ConnectionRepository): """ notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepositoryBase") - def __init__(self, dcFileNames = None): - self.dcSuffix="" + def __init__(self, dcFileNames = None, dcSuffix = ''): ConnectionRepository.__init__(self, ConnectionRepository.CM_HTTP, base.config, hasOwnerView=True) + self.dcSuffix = dcSuffix if hasattr(self, 'setVerbose'): if self.config.GetBool('verbose-clientrepository'): self.setVerbose(1) @@ -81,7 +81,7 @@ class ClientRepositoryBase(ConnectionRepository): if self.deferredGenerates: taskMgr.remove('deferredGenerate') - taskMgr.doMethodLater(self.deferInterval, self.__doDeferredGenerate, 'deferredGenerate') + taskMgr.doMethodLater(self.deferInterval, self.doDeferredGenerate, 'deferredGenerate') ## def queryObjectAll(self, doID, context=0): ## """ @@ -117,18 +117,6 @@ class ClientRepositoryBase(ConnectionRepository): # we might get a list of message names, use the first one return makeList(MsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0] - def sendDisconnect(self): - if self.isConnected(): - # Tell the game server that we're going: - datagram = PyDatagram() - # Add message type - datagram.addUint16(CLIENT_DISCONNECT) - # Send the message - self.send(datagram) - self.notify.info("Sent disconnect message to server") - self.disconnect() - self.stopHeartbeat() - def allocateContext(self): self.context+=1 return self.context @@ -162,67 +150,7 @@ class ClientRepositoryBase(ConnectionRepository): """ return time.time() + self.serverDelta - def handleGenerateWithRequired(self, di): - parentId = di.getUint32() - zoneId = di.getUint32() - assert parentId == self.GameGlobalsId or parentId in self.doId2do - # Get the class Id - classId = di.getUint16() - # Get the DO Id - doId = di.getUint32() - # Look up the dclass - dclass = self.dclassesByNumber[classId] - dclass.startGenerate() - # Create a new distributed object, and put it in the dictionary - distObj = self.generateWithRequiredFields(dclass, doId, di, parentId, zoneId) - dclass.stopGenerate() - - def handleGenerateWithRequiredOther(self, di): - parentId = di.getUint32() - zoneId = di.getUint32() - # Get the class Id - classId = di.getUint16() - # Get the DO Id - doId = di.getUint32() - - dclass = self.dclassesByNumber[classId] - - deferrable = getattr(dclass.getClassDef(), 'deferrable', False) - if not self.deferInterval or self.noDefer: - deferrable = False - - now = globalClock.getFrameTime() - if self.deferredGenerates or deferrable: - # This object is deferrable, or there are already deferred - # objects in the queue (so all objects have to be held - # up). - if self.deferredGenerates or now - self.lastGenerate < self.deferInterval: - # Queue it for later. - assert(self.notify.debug("deferring generate for %s %s" % (dclass.getName(), doId))) - self.deferredGenerates.append((CLIENT_CREATE_OBJECT_REQUIRED_OTHER, doId)) - - # Keep a copy of the datagram, and move the di to the copy - dg = Datagram(di.getDatagram()) - di = DatagramIterator(dg, di.getCurrentIndex()) - - self.deferredDoIds[doId] = ((parentId, zoneId, classId, doId, di), deferrable, dg, []) - if len(self.deferredGenerates) == 1: - # We just deferred the first object on the queue; - # start the task to generate it. - taskMgr.remove('deferredGenerate') - taskMgr.doMethodLater(self.deferInterval, self.__doDeferredGenerate, 'deferredGenerate') - - else: - # We haven't generated any deferrable objects in a - # while, so it's safe to go ahead and generate this - # one immediately. - self.lastGenerate = now - self.__doGenerate(parentId, zoneId, classId, doId, di) - - else: - self.__doGenerate(parentId, zoneId, classId, doId, di) - - def __doGenerate(self, parentId, zoneId, classId, doId, di): + def doGenerate(self, parentId, zoneId, classId, doId, di): # Look up the dclass assert parentId == self.GameGlobalsId or parentId in self.doId2do dclass = self.dclassesByNumber[classId] @@ -252,7 +180,7 @@ class ClientRepositoryBase(ConnectionRepository): if doId in self.deferredDoIds: args, deferrable, dg, updates = self.deferredDoIds[doId] del self.deferredDoIds[doId] - self.__doGenerate(*args) + self.doGenerate(*args) if deferrable: self.lastGenerate = globalClock.getFrameTime() @@ -272,7 +200,7 @@ class ClientRepositoryBase(ConnectionRepository): else: self.notify.warning("Ignoring deferred message %s" % (msgType)) - def __doDeferredGenerate(self, task): + def doDeferredGenerate(self, task): """ This is the task that generates an object on the deferred queue. """ @@ -290,51 +218,6 @@ class ClientRepositoryBase(ConnectionRepository): # All objects are generaetd. return Task.done - def handleGenerateWithRequiredOtherOwner(self, di): - # Get the class Id - classId = di.getUint16() - # Get the DO Id - doId = di.getUint32() - # parentId and zoneId are not relevant here - parentId = di.getUint32() - zoneId = di.getUint32() - # Look up the dclass - dclass = self.dclassesByNumber[classId] - dclass.startGenerate() - # Create a new distributed object, and put it in the dictionary - distObj = self.generateWithRequiredOtherFieldsOwner(dclass, doId, di) - dclass.stopGenerate() - - def handleQuietZoneGenerateWithRequired(self, di): - # Special handler for quiet zone generates -- we need to filter - parentId = di.getUint32() - zoneId = di.getUint32() - assert parentId in self.doId2do - # Get the class Id - classId = di.getUint16() - # Get the DO Id - doId = di.getUint32() - # Look up the dclass - dclass = self.dclassesByNumber[classId] - dclass.startGenerate() - distObj = self.generateWithRequiredFields(dclass, doId, di, parentId, zoneId) - dclass.stopGenerate() - - def handleQuietZoneGenerateWithRequiredOther(self, di): - # Special handler for quiet zone generates -- we need to filter - parentId = di.getUint32() - zoneId = di.getUint32() - assert parentId in self.doId2do - # Get the class Id - classId = di.getUint16() - # Get the DO Id - doId = di.getUint32() - # Look up the dclass - dclass = self.dclassesByNumber[classId] - dclass.startGenerate() - distObj = self.generateWithRequiredOtherFields(dclass, doId, di, parentId, zoneId) - dclass.stopGenerate() - def generateWithRequiredFields(self, dclass, doId, di, parentId, zoneId): if self.doId2do.has_key(doId): # ...it is in our dictionary. @@ -471,12 +354,6 @@ class ClientRepositoryBase(ConnectionRepository): return distObj - def handleDisable(self, di, ownerView=False): - # Get the DO Id - doId = di.getUint32() - # disable it. - self.disableDoId(doId, ownerView) - def disableDoId(self, doId, ownerView=False): table, cache = self.getTables(ownerView) # Make sure the object exists @@ -518,7 +395,7 @@ class ClientRepositoryBase(ConnectionRepository): " is not in dictionary, ownerView=%s" % ownerView) def handleDelete(self, di): - # overridden by ToontownClientRepository + # overridden by ClientRepository assert 0 def handleUpdateField(self, di): @@ -653,25 +530,6 @@ class ClientRepositoryBase(ConnectionRepository): return doDict - def sendSetLocation(self, doId, parentId, zoneId): - datagram = PyDatagram() - datagram.addUint16(CLIENT_OBJECT_LOCATION) - datagram.addUint32(doId) - datagram.addUint32(parentId) - datagram.addUint32(zoneId) - self.send(datagram) - - def sendHeartbeat(self): - datagram = PyDatagram() - # Add message type - datagram.addUint16(CLIENT_HEARTBEAT) - # Send it! - self.send(datagram) - self.lastHeartbeat = globalClock.getRealTime() - # This is important enough to consider flushing immediately - # (particularly if we haven't run readerPollTask recently). - self.considerFlush() - def considerHeartbeat(self): """Send a heartbeat message if we haven't sent one recently.""" if not self.heartbeatStarted: diff --git a/direct/src/distributed/ConnectionRepository.py b/direct/src/distributed/ConnectionRepository.py index 41183a3ff3..260d80fd6e 100644 --- a/direct/src/distributed/ConnectionRepository.py +++ b/direct/src/distributed/ConnectionRepository.py @@ -22,6 +22,7 @@ class ConnectionRepository( """ notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository") taskPriority = -30 + taskChain = None CM_HTTP=0 CM_NET=1 @@ -569,7 +570,7 @@ class ConnectionRepository( self.accept(CConnectionRepository.getOverflowEventName(), self.handleReaderOverflow) taskMgr.add(self.readerPollUntilEmpty, self.uniqueName("readerPollTask"), - priority = self.taskPriority, taskChain = 'net') + priority = self.taskPriority, taskChain = self.taskChain) def stopReaderPollTask(self): taskMgr.remove(self.uniqueName("readerPollTask")) @@ -611,7 +612,7 @@ class ConnectionRepository( # Zero-length datagrams might freak out the server. No point # in sending them, anyway. if datagram.getLength() > 0: - if ConnectionRepository.notify.getDebug(): + if self.notify.getDebug(): print "ConnectionRepository sending datagram:" datagram.dumpHex(ostream) diff --git a/direct/src/distributed/DistributedNodeAI.py b/direct/src/distributed/DistributedNodeAI.py index b0b1f9e05b..f8670474d5 100644 --- a/direct/src/distributed/DistributedNodeAI.py +++ b/direct/src/distributed/DistributedNodeAI.py @@ -1,4 +1,3 @@ -from otp.ai.AIBaseGlobal import * from pandac.PandaModules import NodePath import DistributedObjectAI import GridParent diff --git a/direct/src/distributed/DistributedObject.py b/direct/src/distributed/DistributedObject.py index df706b80c2..f785e197e3 100644 --- a/direct/src/distributed/DistributedObject.py +++ b/direct/src/distributed/DistributedObject.py @@ -438,15 +438,21 @@ class DistributedObject(DistributedObjectBase): # should call doneBarrier(), which will send the context # number back to the AI. for context, name, avIds in data: - if base.localAvatar.doId in avIds: - # We found localToon's id; stop here. - self.__barrierContext = (context, name) - assert self.notify.debug('setBarrierData(%s, %s)' % (context, name)) - return + for avId in avIds: + if self.cr.isLocalId(avId): + # We found the local avatar's id; stop here. + self.__barrierContext = (context, name) + assert self.notify.debug('setBarrierData(%s, %s)' % (context, name)) + return + # This barrier didn't involve this client; ignore it. assert self.notify.debug('setBarrierData(%s)' % (None)) self.__barrierContext = None + def getBarrierData(self): + # Return a trivially-empty (context, name, avIds) value. + return ((0, '', []),) + def doneBarrier(self, name = None): # Tells the AI we have finished handling our task. If the # optional name parameter is specified, it must match the @@ -508,9 +514,6 @@ class DistributedObject(DistributedObjectBase): # avatar class overrides this to return true. return self.cr and self.cr.isLocalId(self.doId) - def updateZone(self, zoneId): - self.cr.sendUpdateZone(self, zoneId) - def isGridParent(self): # If this distributed object is a DistributedGrid return 1. 0 by default return 0 diff --git a/direct/src/distributed/DistributedObjectAI.py b/direct/src/distributed/DistributedObjectAI.py index fd3d640549..4ac8338d78 100644 --- a/direct/src/distributed/DistributedObjectAI.py +++ b/direct/src/distributed/DistributedObjectAI.py @@ -3,7 +3,6 @@ from direct.directnotify.DirectNotifyGlobal import directNotify from direct.distributed.DistributedObjectBase import DistributedObjectBase from direct.showbase import PythonUtil -from otp.ai.AIZoneData import AIZoneData from pandac.PandaModules import * #from PyDatagram import PyDatagram #from PyDatagramIterator import PyDatagramIterator @@ -303,6 +302,7 @@ class DistributedObjectAI(DistributedObjectBase): # setLocation destroys self._zoneData if we move away to # a different zone if self._zoneData is None: + from otp.ai.AIZoneData import AIZoneData self._zoneData = AIZoneData(self.air, self.parentId, self.zoneId) return self._zoneData @@ -334,9 +334,7 @@ class DistributedObjectAI(DistributedObjectBase): def sendUpdate(self, fieldName, args = []): assert self.notify.debugStateCall(self) if self.air: - dg = self.dclass.aiFormatUpdate( - fieldName, self.doId, self.doId, self.air.ourChannel, args) - self.air.sendDatagram(dg) + self.air.sendUpdate(self, fieldName, args) def GetPuppetConnectionChannel(self, doId): return doId + (1L << 32) @@ -510,14 +508,14 @@ class DistributedObjectAI(DistributedObjectBase): self.__barriers[context] = barrier # Send the context number to each involved client. - self.sendUpdate("setBarrierData", [self.__getBarrierData()]) + self.sendUpdate("setBarrierData", [self.getBarrierData()]) else: # No avatars; just call the callback immediately. callback(avIds) return context - def __getBarrierData(self): + def getBarrierData(self): # Returns the barrier data formatted for sending to the # clients. This lists all of the current outstanding barriers # and the avIds waiting for them. @@ -567,3 +565,7 @@ class DistributedObjectAI(DistributedObjectBase): def execCommand(self, string, mwMgrId, avId, zoneId): pass + def _retrieveCachedData(self): + """ This is a no-op on the AI. """ + pass + diff --git a/direct/src/distributed/DistributedSmoothNode.py b/direct/src/distributed/DistributedSmoothNode.py index 462e432d24..7ed736479b 100644 --- a/direct/src/distributed/DistributedSmoothNode.py +++ b/direct/src/distributed/DistributedSmoothNode.py @@ -378,6 +378,25 @@ class DistributedSmoothNode(DistributedNode.DistributedNode, if not self.localControl and not self.smoothStarted and \ self.smoother.getLatestPosition(): self.smoother.applySmoothPosHpr(self, self) + + # These are all required by the CMU server, which requires get* to + # match set* in more cases than the Disney server does. + def getComponentL(self): + return self.zoneId + def getComponentX(self): + return self.getX() + def getComponentY(self): + return self.getY() + def getComponentZ(self): + return self.getZ() + def getComponentH(self): + return self.getH() + def getComponentP(self): + return self.getP() + def getComponentR(self): + return self.getR() + def getComponentT(self): + return 0 @report(types = ['args'], dConfigParam = 'smoothnode') def clearSmoothing(self, bogus = None): diff --git a/direct/src/distributed/DistributedSmoothNodeAI.py b/direct/src/distributed/DistributedSmoothNodeAI.py index 68613508c1..968a7ccfa4 100755 --- a/direct/src/distributed/DistributedSmoothNodeAI.py +++ b/direct/src/distributed/DistributedSmoothNodeAI.py @@ -1,4 +1,3 @@ -from otp.ai.AIBaseGlobal import * import DistributedNodeAI import DistributedSmoothNodeBase diff --git a/direct/src/distributed/DoCollectionManager.py b/direct/src/distributed/DoCollectionManager.py index 37840c265b..e95fbed9d8 100755 --- a/direct/src/distributed/DoCollectionManager.py +++ b/direct/src/distributed/DoCollectionManager.py @@ -23,6 +23,9 @@ class DoCollectionManager: def getDo(self, doId): return self.doId2do.get(doId) + def getGameDoId(self): + return self.GameGlobalsId + def callbackWithDo(self, doId, callback): do = self.doId2do.get(doId) if do is not None: @@ -337,14 +340,14 @@ class DoCollectionManager: parentObj = self.doId2do.get(parentId) if parentObj is not None: parentObj.handleChildArrive(object, zoneId) - elif parentId not in (0, self.getGameDoId()): + elif parentId not in (None, 0, self.getGameDoId()): self.notify.warning('storeObjectLocation(%s): parent %s not present' % (object.doId, parentId)) elif oldZoneId != zoneId: parentObj = self.doId2do.get(parentId) if parentObj is not None: parentObj.handleChildArriveZone(object, zoneId) - elif parentId not in (0, self.getGameDoId()): + elif parentId not in (None, 0, self.getGameDoId()): self.notify.warning('storeObjectLocation(%s): parent %s not present' % (object.doId, parentId)) diff --git a/direct/src/distributed/MsgTypes.py b/direct/src/distributed/MsgTypes.py index a981bfc86b..ec3989a69e 100644 --- a/direct/src/distributed/MsgTypes.py +++ b/direct/src/distributed/MsgTypes.py @@ -87,7 +87,14 @@ MsgName2Id = { 'CLIENT_SET_FIELD_SENDABLE': 120, 'CLIENT_SYSTEMMESSAGE_AKNOWLEDGE': 123, - 'CLIENT_CHANGE_GENERATE_ORDER': 124, + 'CLIENT_CHANGE_GENERATE_ORDER': 124, + + 'STATESERVER_OBJECT_GENERATE_WITH_REQUIRED': 2001, + 'STATESERVER_OBJECT_GENERATE_WITH_REQUIRED_OTHER': 2003, + 'STATESERVER_OBJECT_UPDATE_FIELD': 2004, + 'STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT': 2050, + 'STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT': 2051, + 'STATESERVER_BOUNCE_MESSAGE': 2086, } # create id->name table for debugging diff --git a/direct/src/distributed/MsgTypesCMU.py b/direct/src/distributed/MsgTypesCMU.py new file mode 100644 index 0000000000..32e70d68fd --- /dev/null +++ b/direct/src/distributed/MsgTypesCMU.py @@ -0,0 +1,31 @@ +""" MsgTypesCMU module: defines the various message type codes as used +by the CMU ServerRepository/ClientRepository code in this directory. +It replaces the MsgTypes module, which is not used by the CMU +implementation. """ + +from direct.showbase.PythonUtil import invertDictLossless + +MsgName2Id = { + 'SET_DOID_RANGE_CMU' : 9001, + 'CLIENT_OBJECT_GENERATE_CMU' : 9002, + 'OBJECT_GENERATE_CMU' : 9003, + 'OBJECT_UPDATE_FIELD_CMU' : 9004, + 'OBJECT_DISABLE_CMU' : 9005, + 'OBJECT_DELETE_CMU' : 9006, + 'REQUEST_GENERATES_CMU' : 9007, + 'CLIENT_DISCONNECT_CMU' : 9008, + 'CLIENT_SET_INTEREST_CMU' : 9009, + 'OBJECT_SET_ZONE_CMU' : 9010, + 'CLIENT_HEARTBEAT_CMU' : 9011, + 'CLIENT_OBJECT_UPDATE_FIELD_TARGETED_CMU' : 9011, + + 'CLIENT_OBJECT_UPDATE_FIELD' : 24, # Matches MsgTypes.CLIENT_OBJECT_UPDATE_FIELD + } + +# create id->name table for debugging +MsgId2Names = invertDictLossless(MsgName2Id) + +# put msg names in module scope, assigned to msg value +for name, value in MsgName2Id.items(): + exec '%s = %s' % (name, value) +del name, value diff --git a/direct/src/distributed/ServerRepository.py b/direct/src/distributed/ServerRepository.py index cc4a624486..d3aa5f44ae 100644 --- a/direct/src/distributed/ServerRepository.py +++ b/direct/src/distributed/ServerRepository.py @@ -2,7 +2,7 @@ from pandac.PandaModules import * #from TaskManagerGlobal import * -from direct.distributed.MsgTypes import * +from direct.distributed.MsgTypesCMU import * from direct.task import Task from direct.directnotify import DirectNotifyGlobal from direct.distributed.PyDatagram import PyDatagram @@ -15,34 +15,129 @@ class ServerRepository: """ This maintains the server-side connection with a Panda server. It is only for use with the Panda LAN server provided by CMU.""" - notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository") + notify = DirectNotifyGlobal.directNotify.newCategory("ServerRepository") - def __init__(self, tcpPort, udpPort, dcFileNames = None): + class Client: + """ This internal class keeps track of the data associated + with each connected client. """ + def __init__(self, connection, netAddress, doIdBase): + # The connection used to communicate with the client. + self.connection = connection + + # The net address to the client, including IP address. + # Used for reporting purposes only. + self.netAddress = netAddress + + # The first doId in the range assigned to the client. + # This also serves as a unique numeric ID for this client. + # (It is sometimes called "avatarId" in some update + # messages, even though the client is not required to use + # this particular number as an avatar ID.) + self.doIdBase = doIdBase + + # The set of zoneIds that the client explicitly has + # interest in. The client will receive updates for all + # distributed objects appearing in one of these zones. + # (The client will also receive updates for all zones in + # which any one of the distributed obejcts that it has + # created still exist.) + self.explicitInterestZoneIds = set() + + # The set of interest zones sent to the client at the last + # update. This is the actual set of zones the client is + # informed of. Changing the explicitInterestZoneIds, + # above, creating or deleting objects in different zones, + # or moving objects between zones, might influence this + # set. + self.currentInterestZoneIds = set() + + # A dictionary of doId -> Object, for distributed objects + # currently in existence that were created by the client. + self.objectsByDoId = {} + + # A dictionary of zoneId -> set([Object]), listing the + # distributed objects assigned to each zone, of the + # objects created by this client. + self.objectsByZoneId = {} + + class Object: + """ This internal class keeps track of the data associated + with each extent distributed object. """ + def __init__(self, doId, zoneId, dclass): + # The object's distributed ID. + self.doId = doId + + # The object's current zone. Each object is associated + # with only one zone. + self.zoneId = zoneId + + # The object's class type. + self.dclass = dclass + + # Note that the server does not store any other data about + # the distributed objects; in particular, it doesn't + # record its current fields. That is left to the clients. + + + def __init__(self, tcpPort, udpPort = None, dcFileNames = None): + # Set up networking interfaces. self.qcm = QueuedConnectionManager() self.qcl = QueuedConnectionListener(self.qcm, 0) self.qcr = QueuedConnectionReader(self.qcm, 0) self.cw = ConnectionWriter(self.qcm, 0) self.tcpRendezvous = self.qcm.openTCPServerRendezvous(tcpPort, 10) - print self.tcpRendezvous self.qcl.addConnection(self.tcpRendezvous) taskMgr.add(self.listenerPoll, "serverListenerPollTask") taskMgr.add(self.readerPollUntilEmpty, "serverReaderPollTask") taskMgr.add(self.clientHardDisconnectTask, "clientHardDisconnect") - self.ClientIP = {} - self.ClientZones = {} - self.ClientDOIDbase = {} - self.ClientObjects = {} - self.DOIDnext = 1 - self.DOIDrange = 1000000 - self.DOIDtoClient = {} - self.DOIDtoZones = {} - self.DOIDtoDClass = {} - self.ZonesToClients = {} - self.ZonetoDOIDs = {} + + # A set of clients that have recently been written to and may + # need to be flushed. + self.needsFlush = set() + + collectTcpInterval = ConfigVariableDouble('collect-tcp-interval').getValue() + taskMgr.doMethodLater(collectTcpInterval, self.flushTask, 'flushTask') + + # A dictionary of connection -> Client object, tracking all of + # the clients we currently have connected. + self.clientsByConnection = {} + + # A similar dictionary of doIdBase -> Client object, indexing + # by the client's doIdBase number instead. + self.clientsByDoIdBase = {} + + # A dictionary of zoneId -> set([Client]), listing the clients + # that have an interest in each zoneId. + self.zonesToClients = {} + + # A dictionary of zoneId -> set([Object]), listing the + # distributed objects assigned to each zone, globally. + self.objectsByZoneId = {} + + # The number of doId's to assign to each client. Must remain + # constant during server lifetime. + self.doIdRange = base.config.GetInt('server-doid-range', 1000000) + + # An allocator object that assigns the next doIdBase to each + # client. + self.idAllocator = UniqueIdAllocator(0, 0xffffffff / self.doIdRange) + self.dcFile = DCFile() self.dcSuffix = '' self.readDCFile(dcFileNames) + def flushTask(self, task): + """ This task is run periodically to flush any connections + that might need it. It's only necessary in cases where + collect-tcp is set true (if this is false, messages are sent + immediately and do not require periodic flushing). """ + + for client in self.needsFlush: + client.connection.flush() + self.needsFlush = set() + + return task.again + def importModule(self, dcImports, moduleName, importSymbols): """ Imports the indicated moduleName and all of its symbols into the current namespace. This more-or-less reimplements @@ -142,7 +237,7 @@ class ServerRepository: classDef = dcImports.get(className) if classDef == None: - self.notify.info("No class definition for %s." % (className)) + self.notify.debug("No class definition for %s." % (className)) else: if type(classDef) == types.ModuleType: if not hasattr(classDef, className): @@ -168,31 +263,38 @@ class ServerRepository: newConnection = PointerToConnection() retVal = self.qcl.getNewConnection(rendezvous, netAddress, newConnection) - if retVal: - # Crazy dereferencing - newConnection=newConnection.p() - self.qcr.addConnection(newConnection) - # Add clients infomation to dictionary - self.ClientIP[newConnection] = netAddress.getIpString() - self.ClientZones[newConnection] = [] - self.ClientObjects[newConnection] = [] - self.lastConnection = newConnection - self.sendDOIDrange(self.lastConnection) - else: - self.notify.warning( - "getNewConnection returned false") + if not retVal: + return + + # Crazy dereferencing + newConnection=newConnection.p() + self.qcr.addConnection(newConnection) + + # Add clients information to dictionary + id = self.idAllocator.allocate() + doIdBase = id * self.doIdRange + 1 + + self.notify.info( + "Got client %s from %s" % (doIdBase, netAddress)) + + client = self.Client(newConnection, netAddress, doIdBase) + self.clientsByConnection[client.connection] = client + self.clientsByDoIdBase[client.doIdBase] = client + + self.lastConnection = newConnection + self.sendDoIdRange(client) + return Task.cont -# continuously polls for new messages on the server - def readerPollUntilEmpty(self, task): + """ continuously polls for new messages on the server """ while self.readerPollOnce(): pass return Task.cont -# checks for available messages to the server - def readerPollOnce(self): + """ checks for available messages to the server """ + availGetVal = self.qcr.dataAvailable() if availGetVal: datagram = NetDatagram() @@ -200,212 +302,388 @@ class ServerRepository: if readRetVal: # need to send to message processing unit self.handleDatagram(datagram) - else: - self.notify.warning("getData returned false") return availGetVal -# switching station for messages - def handleDatagram(self, datagram): + """ switching station for messages """ + + client = self.clientsByConnection.get(datagram.getConnection()) + + if self.notify.getDebug(): + self.notify.debug( + "ServerRepository received datagram from %s:" % (client.doIdBase)) + datagram.dumpHex(ostream) + + if not client: + # This shouldn't be possible. + self.notify.error( + "Received datagram from unknown connection.") + dgi = DatagramIterator(datagram) + type = dgi.getUint16() - if type == CLIENT_DISCONNECT: - self.handleClientDisconnect(datagram.getConnection()) - elif type == CLIENT_SET_ZONE_CMU: - self.handleSetZone(dgi, datagram.getConnection()) - elif type == CLIENT_REMOVE_ZONE: - self.handleRemoveZone(dgi, datagram.getConnection()) - elif type == CLIENT_CREATE_OBJECT_REQUIRED: - self.handleClientCreateObjectRequired(datagram, dgi) + if type == CLIENT_DISCONNECT_CMU: + self.handleClientDisconnect(client) + elif type == CLIENT_SET_INTEREST_CMU: + self.handleClientSetInterest(client, dgi) + elif type == CLIENT_OBJECT_GENERATE_CMU: + self.handleClientCreateObject(datagram, dgi) elif type == CLIENT_OBJECT_UPDATE_FIELD: - self.handleClientUpdateField(datagram, dgi) - elif type == CLIENT_OBJECT_DELETE: + self.handleClientObjectUpdateField(datagram, dgi) + elif type == CLIENT_OBJECT_UPDATE_FIELD_TARGETED_CMU: + self.handleClientObjectUpdateField(datagram, dgi, targeted = True) + elif type == OBJECT_DELETE_CMU: self.handleClientDeleteObject(datagram, dgi.getUint32()) - elif type == CLIENT_OBJECT_DISABLE: - self.handleClientDisable(datagram, dgi.getUint32()) + elif type == OBJECT_SET_ZONE_CMU: + self.handleClientObjectSetZone(datagram, dgi) else: self.handleMessageType(type, dgi) def handleMessageType(self, msgType, di): - self.notify.error("unrecognized message") + self.notify.warning("unrecognized message type %s" % (msgType)) -# client wants to create an object, so we store appropriate data, -# and then pass message along to corresponding zones + def handleClientCreateObject(self, datagram, dgi): + """ client wants to create an object, so we store appropriate + data, and then pass message along to corresponding zones """ - def handleClientCreateObjectRequired(self, datagram, dgi): connection = datagram.getConnection() - # no need to create a new message, just forward the received - # message as it has the same msg type number - zone = dgi.getUint32() - classid = dgi.getUint16() - doid = dgi.getUint32() - rest = dgi.getRemainingBytes() - datagram = NetDatagram() - datagram.addUint16(CLIENT_CREATE_OBJECT_REQUIRED) - datagram.addUint16(classid) - datagram.addUint32(doid) - datagram.appendData(rest) - dclass = self.dclassesByNumber[classid] - if self.ClientObjects[connection].count(doid) == 0: - self.ClientObjects[connection].append(doid) - self.DOIDtoZones[doid] = zone - self.DOIDtoDClass[doid] = dclass - if zone in self.ZonetoDOIDs: - if self.ZonetoDOIDs[zone].count(doid)==0: - self.ZonetoDOIDs[zone].append(doid) + zoneId = dgi.getUint32() + classId = dgi.getUint16() + doId = dgi.getUint32() + + client = self.clientsByConnection[connection] + + if self.getDoIdBase(doId) != client.doIdBase: + self.notify.warning( + "Ignoring attempt to create invalid doId %s from client %s" % (doId, client.doIdBase)) + return + + dclass = self.dclassesByNumber[classId] + + object = client.objectsByDoId.get(doId) + if object: + # This doId is already in use; thus, this message is + # really just an update. + if object.dclass != dclass: + self.notify.warning( + "Ignoring attempt to change object %s from %s to %s by client %s" % ( + doId, object.dclass.getName(), dclass.getName(), client.doIdBase)) + return + self.setObjectZone(client, object, zoneId) else: - self.ZonetoDOIDs[zone] = [doid] - self.sendToZoneExcept(zone, datagram, connection) + if self.notify.getDebug(): + self.notify.debug( + "Creating object %s of type %s by client %s" % ( + doId, dclass.getName(), client.doIdBase)) + + object = self.Object(doId, zoneId, dclass) + client.objectsByDoId[doId] = object + client.objectsByZoneId.setdefault(zoneId, set()).add(object) + self.objectsByZoneId.setdefault(zoneId, set()).add(object) + + self.updateClientInterestZones(client) - # client wants to update an object, forward message along - # to corresponding zone + # Rebuild the new datagram that we'll send on. We shim in the + # doIdBase of the owner. + dg = PyDatagram() + dg.addUint16(OBJECT_GENERATE_CMU) + dg.addUint32(client.doIdBase) + dg.addUint32(zoneId) + dg.addUint16(classId) + dg.addUint32(doId) + dg.appendData(dgi.getRemainingBytes()) + + self.sendToZoneExcept(zoneId, dg, [client]) - def handleClientUpdateField(self, datagram, dgi): + def handleClientObjectUpdateField(self, datagram, dgi, targeted = False): + """ Received an update request from a client. """ connection = datagram.getConnection() - doid = dgi.getUint32() - fieldid = dgi.getUint16() - dclass = self.DOIDtoDClass[doid] - dcfield = dclass.getFieldByIndex(fieldid) + client = self.clientsByConnection[connection] + + if targeted: + targetId = dgi.getUint32() + doId = dgi.getUint32() + fieldId = dgi.getUint16() + + doIdBase = self.getDoIdBase(doId) + owner = self.clientsByDoIdBase.get(doIdBase) + object = owner and owner.objectsByDoId.get(doId) + if not object: + self.notify.warning( + "Ignoring update for unknown object %s from client %s" % ( + doId, client.doIdBase)) + return + + dcfield = object.dclass.getFieldByIndex(fieldId) if dcfield == None: - self.notify.error( - "Received update for field %s on object %s; no such field for class %s." % ( - fieldid, doid, dclass.getName())) - return - if (dcfield.hasKeyword('broadcast')): + self.notify.warning( + "Ignoring update for field %s on object %s from client %s; no such field for class %s." % ( + fieldId, doId, client.doIdBase, object.dclass.getName())) + + if client != owner: + # This message was not sent by the object's owner. + if not dcfield.hasKeyword('clsend') and not dcfield.hasKeyword('p2p'): + self.notify.warning( + "Ignoring update for %s.%s on object %s from client %s: not owner" % ( + dclass.getName(), dcfield.getName(), doId, client.doIdBase)) + return + + # We reformat the message slightly to insert the sender's + # doIdBase. + dg = PyDatagram() + dg.addUint16(OBJECT_UPDATE_FIELD_CMU) + dg.addUint32(client.doIdBase) + dg.addUint32(doId) + dg.addUint16(fieldId) + dg.appendData(dgi.getRemainingBytes()) + + if targeted: + # A targeted update: only to the indicated client. + target = self.clientsByDoIdBase.get(targetId) + if not target: + self.notify.warning( + "Ignoring targeted update to %s for %s.%s on object %s from client %s: target not known" % ( + targetId, + dclass.getName(), dcfield.getName(), doId, client.doIdBase)) + return + self.cw.send(dg, target.connection) + self.needsFlush.add(target) + + elif dcfield.hasKeyword('p2p'): + # p2p: to object owner only + self.cw.send(dg, owner.connection) + self.needsFlush.add(owner) + + elif dcfield.hasKeyword('broadcast'): + # Broadcast: to everyone except orig sender + self.sendToZoneExcept(object.zoneId, dg, [client]) + + elif dcfield.hasKeyword('reflect'): + # Reflect: broadcast to everyone including orig sender + self.sendToZoneExcept(object.zoneId, dg, []) - if (dcfield.hasKeyword('p2p')): - self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, 0) - else: - self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, connection) - elif (dcfield.hasKeyword('p2p')): - doidbase = (doid / self.DOIDrange) * self.DOIDrange - self.cw.send(datagram, self.DOIDtoClient[doidbase]) else: - self.notify.warning( - "Message is not broadcast, p2p, or broadcast+p2p") + self.notify.warning( + "Message is not broadcast or p2p") - # client disables an object, let everyone know who is in - # that zone know about it + def getDoIdBase(self, doId): + """ Given a doId, return the corresponding doIdBase. This + will be the owner of the object (clients may only create + object doId's within their assigned range). """ - def handleClientDisable(self, datagram, doid): - # now send disable message to all clients that need to know - if doid in self.DOIDtoZones: - self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, 0) + return int(doId / self.doIdRange) * self.doIdRange + 1 - # client deletes an object, let everyone who is in zone with - # object know about it + def handleClientDeleteObject(self, datagram, doId): + """ client deletes an object, let everyone who has interest in + the object's zone know about it. """ - def handleClientDeleteObject(self, datagram, doid): - if doid in self.DOIDtoZones: - self.sendToZoneExcept(self.DOIDtoZones[doid], datagram, 0) - self.ClientObjects[datagram.getConnection()].remove(doid) - self.ZonetoDOIDs[self.DOIDtoZones[doid]].remove(doid) - del self.DOIDtoZones[doid] - del self.DOIDtoDClass[doid] + connection = datagram.getConnection() + client = self.clientsByConnection[connection] + object = client.objectsByDoId.get(doId) + if not object: + self.notify.warning( + "Ignoring update for unknown object %s from client %s" % ( + doId, client.doIdBase)) + return - def sendAvatarGenerate(self): + self.sendToZoneExcept(object.zoneId, datagram, []) + + self.objectsByZoneId[object.zoneId].remove(object) + if not self.objectsByZoneId[object.zoneId]: + del self.objectsByZoneId[object.zoneId] + client.objectsByZoneId[object.zoneId].remove(object) + if not client.objectsByZoneId[object.zoneId]: + del client.objectsByZoneId[object.zoneId] + del client.objectsByDoId[doId] + + self.updateClientInterestZones(client) + + def handleClientObjectSetZone(self, datagram, dgi): + """ The client is telling us the object is changing to a new + zone. """ + doId = dgi.getUint32() + zoneId = dgi.getUint32() + + connection = datagram.getConnection() + client = self.clientsByConnection[connection] + object = client.objectsByDoId.get(doId) + if not object: + # Don't know this object. + self.notify.warning("Ignoring object location for %s: unknown" % (doId)) + return + + self.setObjectZone(client, object, zoneId) + + def setObjectZone(self, owner, object, zoneId): + if object.zoneId == zoneId: + # No change. + return + + oldZoneId = object.zoneId + self.objectsByZoneId[object.zoneId].remove(object) + if not self.objectsByZoneId[object.zoneId]: + del self.objectsByZoneId[object.zoneId] + owner.objectsByZoneId[object.zoneId].remove(object) + if not owner.objectsByZoneId[object.zoneId]: + del owner.objectsByZoneId[object.zoneId] + + object.zoneId = zoneId + self.objectsByZoneId.setdefault(zoneId, set()).add(object) + owner.objectsByZoneId.setdefault(zoneId, set()).add(object) + + self.updateClientInterestZones(owner) + + # Any clients that are listening to oldZoneId but not zoneId + # should receive a disable message: this object has just gone + # out of scope for you. datagram = PyDatagram() - # Message type is 1 - datagram.addUint16(ALL_OBJECT_GENERATE_WITH_REQUIRED) - # Avatar class type is 2 - datagram.addUint8(2) - # A sample id - datagram.addUint32(10) - # The only required field is the zone field - datagram.addUint32(999) - self.cw.send(datagram, self.lastConnection) + datagram.addUint16(OBJECT_DISABLE_CMU) + datagram.addUint32(object.doId) + for client in self.zonesToClients[oldZoneId]: + if client != owner: + if zoneId not in client.currentInterestZoneIds: + self.cw.send(datagram, client.connection) + self.needsFlush.add(client) - # sends the client the range of doid's that the client can use + # The client is now responsible for sending a generate for the + # object that just switched zones, to inform the clients that + # are listening to the new zoneId but not the old zoneId. + + def sendDoIdRange(self, client): + """ sends the client the range of doid's that the client can + use """ - def sendDOIDrange(self, connection): - # reuse DOID assignments if we can - id = self.DOIDnext + self.DOIDrange - self.DOIDnext = self.DOIDnext + self.DOIDrange - self.DOIDtoClient[id] = connection - self.ClientDOIDbase[connection] = id datagram = NetDatagram() - datagram.addUint16(CLIENT_SET_DOID_RANGE) - datagram.addUint32(id) - datagram.addUint32(self.DOIDrange) - print "Sending DOID range: ", id, self.DOIDrange - self.cw.send(datagram, connection) + datagram.addUint16(SET_DOID_RANGE_CMU) + datagram.addUint32(client.doIdBase) + datagram.addUint32(self.doIdRange) - # a client disconnected from us, we need to update our data, also tell other clients to remove - # the disconnected clients objects - def handleClientDisconnect(self, connection): - if (self.ClientIP.has_key(connection)): - del self.DOIDtoClient[self.ClientDOIDbase[connection]] - for zone in self.ClientZones[connection]: - if len(self.ZonesToClients[zone]) == 1: - del self.ZonesToClients[zone] - else: - self.ZonesToClients[zone].remove(connection) - for obj in self.ClientObjects[connection]: - #create and send delete message - datagram = NetDatagram() - datagram.addUint16(CLIENT_OBJECT_DELETE_RESP) - datagram.addUint32(obj) - self.sendToZoneExcept(self.DOIDtoZones[obj], datagram, 0) - self.ZonetoDOIDs[self.DOIDtoZones[obj]].remove(obj) - del self.DOIDtoZones[obj] - del self.DOIDtoDClass[obj] - del self.ClientIP[connection] - del self.ClientZones[connection] - del self.ClientDOIDbase[connection] - del self.ClientObjects[connection] + self.cw.send(datagram, client.connection) + self.needsFlush.add(client) - # client told us its zone(s), store information - def handleSetZone(self, dgi, connection): - while dgi.getRemainingSize() > 0: - ZoneID = dgi.getUint32() - if self.ClientZones[connection].count(ZoneID) == 0: - self.ClientZones[connection].append(ZoneID) - if ZoneID in self.ZonesToClients: - if self.ZonesToClients[ZoneID].count(connection) == 0: - self.ZonesToClients[ZoneID].append(connection) + # a client disconnected from us, we need to update our data, also + # tell other clients to remove the disconnected clients objects + def handleClientDisconnect(self, client): + for zoneId in client.currentInterestZoneIds: + if len(self.zonesToClients[zoneId]) == 1: + del self.zonesToClients[zoneId] else: - self.ZonesToClients[ZoneID] = [connection] + self.zonesToClients[zoneId].remove(client) - # We have a new member, need to get all of the data from clients who may have objects in this zone + for object in client.objectsByDoId.values(): + #create and send delete message datagram = NetDatagram() - datagram.addUint16(CLIENT_REQUEST_GENERATES) - datagram.addUint32(ZoneID) - self.sendToAll(datagram) - print "SENDING REQUEST GENERATES (", ZoneID, ") TO ALL" + datagram.addUint16(OBJECT_DELETE_CMU) + datagram.addUint32(object.doId) + self.sendToZoneExcept(object.zoneId, datagram, []) + self.objectsByZoneId[object.zoneId].remove(object) + if not self.objectsByZoneId[object.zoneId]: + del self.objectsByZoneId[object.zoneId] - # client has moved zones, need to update them - def handleRemoveZone(self, dgi, connection): + client.objectsByDoId = {} + client.objectsByZoneId = {} + + del self.clientsByConnection[client.connection] + del self.clientsByDoIdBase[client.doIdBase] + + id = client.doIdBase / self.doIdRange + self.idAllocator.free(id) + + self.qcr.removeConnection(client.connection) + self.qcm.closeConnection(client.connection) + + + def handleClientSetInterest(self, client, dgi): + """ The client is specifying a particular set of zones it is + interested in. """ + + zoneIds = set() while dgi.getRemainingSize() > 0: - ZoneID = dgi.getUint32() - if self.ClientZones[connection].count(ZoneID) == 1: - self.ClientZones[connection].remove(ZoneID) - if ZoneID in self.ZonesToClients: - if self.ZonesToClients[ZoneID].count(connection) == 1: - self.ZonesToClients[ZoneID].remove(connection) - for i in self.ZonetoDOIDs[ZoneID]: - datagram = NetDatagram() - datagram.addUint16(CLIENT_OBJECT_DELETE) - datagram.addUint32(i) - self.cw.send(datagram, connection) + zoneId = dgi.getUint32() + zoneIds.add(zoneId) - # client did not tell us he was leaving but we lost connection to him, so we need to update our data and tell others + client.explicitInterestZoneIds = zoneIds + self.updateClientInterestZones(client) + + def updateClientInterestZones(self, client): + """ Something about the client has caused its set of interest + zones to potentially change. Recompute them. """ + + origZoneIds = client.currentInterestZoneIds + newZoneIds = client.explicitInterestZoneIds | set(client.objectsByZoneId.keys()) + if origZoneIds == newZoneIds: + # No change. + return + + client.currentInterestZoneIds = newZoneIds + addedZoneIds = newZoneIds - origZoneIds + removedZoneIds = origZoneIds - newZoneIds + + for zoneId in addedZoneIds: + self.zonesToClients.setdefault(zoneId, set()).add(client) + + # The client is opening interest in this zone. Need to get + # all of the data from clients who may have objects in + # this zone + datagram = NetDatagram() + datagram.addUint16(REQUEST_GENERATES_CMU) + datagram.addUint32(zoneId) + self.sendToZoneExcept(zoneId, datagram, [client]) + + datagram = PyDatagram() + datagram.addUint16(OBJECT_DISABLE_CMU) + for zoneId in removedZoneIds: + self.zonesToClients[zoneId].remove(client) + + # The client is abandoning interest in this zone. Any + # objects in this zone should be disabled for the client. + for object in self.objectsByZoneId.get(zoneId, []): + datagram.addUint32(object.doId) + self.cw.send(datagram, client.connection) + + self.needsFlush.add(client) + def clientHardDisconnectTask(self, task): - for i in self.ClientIP.keys(): - if not self.qcr.isConnectionOk(i): - self.handleClientDisconnect(i) + """ client did not tell us he was leaving but we lost connection to + him, so we need to update our data and tell others """ + for client in self.clientsByConnection.values(): + if not self.qcr.isConnectionOk(client.connection): + self.handleClientDisconnect(client) return Task.cont - # sends a message to everyone who is in the zone - def sendToZoneExcept(self, ZoneID, datagram, connection): - if ZoneID in self.ZonesToClients: - for conn in self.ZonesToClients[ZoneID]: - if (conn != connection): - self.cw.send(datagram, conn) + def sendToZoneExcept(self, zoneId, datagram, exceptionList): + """sends a message to everyone who has interest in the + indicated zone, except for the clients on exceptionList.""" + + if self.notify.getDebug(): + self.notify.debug( + "ServerRepository sending to all in zone %s except %s:" % (zoneId, map(lambda c: c.doIdBase, exceptionList))) + datagram.dumpHex(ostream) - # sends a message to all connected clients - def sendToAll(self, datagram): - for client in self.ClientIP.keys(): - self.cw.send(datagram, client) + for client in self.zonesToClients.get(zoneId, []): + if client not in exceptionList: + if self.notify.getDebug(): + self.notify.debug( + " -> %s" % (client.doIdBase)) + self.cw.send(datagram, client.connection) + self.needsFlush.add(client) + + def sendToAllExcept(self, datagram, exceptionList): + """ sends a message to all connected clients, except for + clients on exceptionList. """ + + if self.notify.getDebug(): + self.notify.debug( + "ServerRepository sending to all except %s:" % (map(lambda c: c.doIdBase, exceptionList),)) + datagram.dumpHex(ostream) + + for client in self.clientsByConnection.values(): + if client not in exceptionList: + if self.notify.getDebug(): + self.notify.debug( + " -> %s" % (client.doIdBase)) + self.cw.send(datagram, client.connection) + self.needsFlush.add(client) diff --git a/direct/src/distributed/TimeManager.py b/direct/src/distributed/TimeManager.py new file mode 100644 index 0000000000..b75ec24599 --- /dev/null +++ b/direct/src/distributed/TimeManager.py @@ -0,0 +1,183 @@ +from direct.showbase.DirectObject import * +from pandac.PandaModules import * +from direct.task import Task +from direct.distributed import DistributedObject +from direct.directnotify import DirectNotifyGlobal +from direct.distributed.ClockDelta import globalClockDelta + +class TimeManager(DistributedObject.DistributedObject): + """ + This DistributedObject lives on the AI and on the client side, and + serves to synchronize the time between them so they both agree, to + within a few hundred milliseconds at least, what time it is. + + It uses a pull model where the client can request a + synchronization check from time to time. It also employs a + round-trip measurement to minimize the effect of latency. + """ + + notify = DirectNotifyGlobal.directNotify.newCategory("TimeManager") + + # The number of seconds to wait between automatic + # synchronizations. Set to 0 to disable auto sync after + # startup. + updateFreq = base.config.GetFloat('time-manager-freq', 1800) + + # The minimum number of seconds to wait between two unrelated + # synchronization attempts. Increasing this number cuts down + # on frivolous synchronizations. + minWait = base.config.GetFloat('time-manager-min-wait', 10) + + # The maximum number of seconds of uncertainty to tolerate in + # the clock delta without trying again. + maxUncertainty = base.config.GetFloat('time-manager-max-uncertainty', 1) + + # The maximum number of attempts to try to get a low-latency + # time measurement before giving up and accepting whatever we + # get. + maxAttempts = base.config.GetInt('time-manager-max-attempts', 5) + + # A simulated clock skew for debugging, in seconds. + extraSkew = base.config.GetInt('time-manager-extra-skew', 0) + + if extraSkew != 0: + notify.info("Simulating clock skew of %0.3f s" % extraSkew) + + reportFrameRateInterval = base.config.GetDouble('report-frame-rate-interval', 300.0) + + def __init__(self, cr): + DistributedObject.DistributedObject.__init__(self, cr) + + self.thisContext = -1 + self.nextContext = 0 + self.attemptCount = 0 + self.start = 0 + self.lastAttempt = -self.minWait*2 + + ### DistributedObject methods ### + + def generate(self): + """ + This method is called when the DistributedObject is reintroduced + to the world, either for the first time or from the cache. + """ + DistributedObject.DistributedObject.generate(self) + + self.accept('clock_error', self.handleClockError) + + if self.updateFreq > 0: + self.startTask() + + def announceGenerate(self): + DistributedObject.DistributedObject.announceGenerate(self) + self.synchronize("TimeManager.announceGenerate") + + def disable(self): + """ + This method is called when the DistributedObject is removed from + active duty and stored in a cache. + """ + self.ignore('clock_error') + self.stopTask() + taskMgr.remove('frameRateMonitor') + DistributedObject.DistributedObject.disable(self) + + def delete(self): + """ + This method is called when the DistributedObject is permanently + removed from the world and deleted from the cache. + """ + DistributedObject.DistributedObject.delete(self) + + ### Task management methods ### + + def startTask(self): + self.stopTask() + taskMgr.doMethodLater(self.updateFreq, self.doUpdate, "timeMgrTask") + + def stopTask(self): + taskMgr.remove("timeMgrTask") + + def doUpdate(self, task): + self.synchronize("timer") + # Spawn the next one + taskMgr.doMethodLater(self.updateFreq, self.doUpdate, "timeMgrTask") + return Task.done + + ### Automatic clock error handling ### + + def handleClockError(self): + self.synchronize("clock error") + + ### Synchronization methods ### + + def synchronize(self, description): + """synchronize(self, string description) + + Call this function from time to time to synchronize watches + with the server. This initiates a round-trip transaction; + when the transaction completes, the time will be synced. + + The description is the string that will be written to the log + file regarding the reason for this synchronization attempt. + + The return value is true if the attempt is made, or false if + it is too soon since the last attempt. + """ + now = globalClock.getRealTime() + + if now - self.lastAttempt < self.minWait: + self.notify.debug("Not resyncing (too soon): %s" % (description)) + return 0 + + self.talkResult = 0 + self.thisContext = self.nextContext + self.attemptCount = 0 + self.nextContext = (self.nextContext + 1) & 255 + self.notify.info("Clock sync: %s" % (description)) + self.start = now + self.lastAttempt = now + self.sendUpdate("requestServerTime", [self.thisContext]) + + return 1 + + + def serverTime(self, context, timestamp): + """serverTime(self, int8 context, int32 timestamp) + + This message is sent from the AI to the client in response to + a previous requestServerTime. It contains the time as + observed by the AI. + + The client should use this, in conjunction with the time + measurement taken before calling requestServerTime (above), to + determine the clock delta between the AI and the client + machines. + """ + end = globalClock.getRealTime() + + if context != self.thisContext: + self.notify.info("Ignoring TimeManager response for old context %d" % (context)) + return + + elapsed = end - self.start + self.attemptCount += 1 + self.notify.info("Clock sync roundtrip took %0.3f ms" % (elapsed * 1000.0)) + + average = (self.start + end) / 2.0 - self.extraSkew + uncertainty = (end - self.start) / 2.0 + abs(self.extraSkew) + + globalClockDelta.resynchronize(average, timestamp, uncertainty) + + self.notify.info("Local clock uncertainty +/- %.3f s" % (globalClockDelta.getUncertainty())) + + if globalClockDelta.getUncertainty() > self.maxUncertainty: + if self.attemptCount < self.maxAttempts: + self.notify.info("Uncertainty is too high, trying again.") + self.start = globalClock.getRealTime() + self.sendUpdate("requestServerTime", [self.thisContext]) + return + self.notify.info("Giving up on uncertainty requirement.") + + messenger.send("gotTimeSync") + diff --git a/direct/src/distributed/TimeManagerAI.py b/direct/src/distributed/TimeManagerAI.py new file mode 100644 index 0000000000..a9aed1222e --- /dev/null +++ b/direct/src/distributed/TimeManagerAI.py @@ -0,0 +1,23 @@ +from direct.distributed.ClockDelta import * +from pandac.PandaModules import * +from direct.distributed import DistributedObjectAI + +class TimeManagerAI(DistributedObjectAI.DistributedObjectAI): + notify = DirectNotifyGlobal.directNotify.newCategory("TimeManagerAI") + + def __init__(self, air): + DistributedObjectAI.DistributedObjectAI.__init__(self, air) + + def requestServerTime(self, context): + """requestServerTime(self, int8 context) + + This message is sent from the client to the AI to initiate a + synchronization phase. The AI should immediately report back + with its current time. The client will then measure the round + trip. + """ + timestamp = globalClockDelta.getRealNetworkTime(bits=32) + requesterId = self.air.getAvatarIdFromSender() + print "requestServerTime from %s" % (requesterId) + self.sendUpdateToAvatarId(requesterId, "serverTime", + [context, timestamp]) diff --git a/direct/src/distributed/cConnectionRepository.I b/direct/src/distributed/cConnectionRepository.I index 1d72c4a89c..3d549bc274 100644 --- a/direct/src/distributed/cConnectionRepository.I +++ b/direct/src/distributed/cConnectionRepository.I @@ -84,6 +84,33 @@ get_client_datagram() const { return _client_datagram; } +//////////////////////////////////////////////////////////////////// +// Function: CConnectionRepository::set_handle_datagrams_internally +// Access: Published +// Description: Sets the handle_datagrams_internally flag. When +// true, certain message types can be handled by the C++ +// code in in this module. When false, all datagrams, +// regardless of message type, are passed up to Python +// for processing. +// +// The CMU distributed-object implementation requires +// this to be set false. +//////////////////////////////////////////////////////////////////// +INLINE void CConnectionRepository:: +set_handle_datagrams_internally(bool handle_datagrams_internally) { + _handle_datagrams_internally = handle_datagrams_internally; +} + +//////////////////////////////////////////////////////////////////// +// Function: CConnectionRepository::get_handle_datagrams_internally +// Access: Published +// Description: Returns the handle_datagrams_internally flag. +//////////////////////////////////////////////////////////////////// +INLINE bool CConnectionRepository:: +get_handle_datagrams_internally() const { + return _handle_datagrams_internally; +} + #ifdef HAVE_PYTHON //////////////////////////////////////////////////////////////////// // Function: CConnectionRepository::set_python_repository @@ -184,7 +211,7 @@ get_datagram_iterator(DatagramIterator &di) { //////////////////////////////////////////////////////////////////// // Function: CConnectionRepository::get_msg_channel // Access: Published -// Description: Returns the channel from which the current message +// Description: Returns the channel(s) to which the current message // was sent, according to the datagram headers. This // information is not available to the client. //////////////////////////////////////////////////////////////////// diff --git a/direct/src/distributed/cConnectionRepository.cxx b/direct/src/distributed/cConnectionRepository.cxx index 977747ad5c..4f3b26ca87 100644 --- a/direct/src/distributed/cConnectionRepository.cxx +++ b/direct/src/distributed/cConnectionRepository.cxx @@ -62,6 +62,7 @@ CConnectionRepository(bool has_owner_view) : _native(false), #endif _client_datagram(true), + _handle_datagrams_internally(handle_datagrams_internally), _simulated_disconnect(false), _verbose(distributed_cat.is_spam()), // _msg_channels(), @@ -275,15 +276,13 @@ check_datagram() { // Start breaking apart the datagram. _di = DatagramIterator(_dg); - if (!_client_datagram) - { + if (!_client_datagram) { unsigned char wc_cnt; wc_cnt = _di.get_uint8(); _msg_channels.clear(); - for(unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) - { - CHANNEL_TYPE schan = _di.get_uint64(); - _msg_channels.push_back(schan); + for (unsigned char lp1 = 0; lp1 < wc_cnt; lp1++) { + CHANNEL_TYPE schan = _di.get_uint64(); + _msg_channels.push_back(schan); } _msg_sender = _di.get_uint64(); @@ -301,6 +300,10 @@ check_datagram() { _msg_type = _di.get_uint16(); // Is this a message that we can process directly? + if (!_handle_datagrams_internally) { + return true; + } + switch (_msg_type) { #ifdef HAVE_PYTHON case CLIENT_OBJECT_UPDATE_FIELD: @@ -321,7 +324,7 @@ check_datagram() { } break; #endif // HAVE_PYTHON - + default: // Some unknown message; let the caller deal with it. return true; @@ -1029,7 +1032,7 @@ bool CConnectionRepository::check_datagram_ai(PyObject *PycallBackFunction) } else { - PyObject * result = PyEval_CallObject(PycallBackFunction, _python_ai_datagramiterator); + PyObject * result = PyEval_CallObject(PycallBackFunction, _python_ai_datagramiterator); if (PyErr_Occurred()) { Py_XDECREF(doId2do); diff --git a/direct/src/distributed/cConnectionRepository.h b/direct/src/distributed/cConnectionRepository.h index 8a617ebd7b..af32f324fa 100644 --- a/direct/src/distributed/cConnectionRepository.h +++ b/direct/src/distributed/cConnectionRepository.h @@ -73,6 +73,9 @@ PUBLISHED: INLINE void set_client_datagram(bool client_datagram); INLINE bool get_client_datagram() const; + INLINE void set_handle_datagrams_internally(bool handle_datagrams_internally); + INLINE bool get_handle_datagrams_internally() const; + #ifdef HAVE_PYTHON INLINE void set_python_repository(PyObject *python_repository); #endif @@ -185,6 +188,7 @@ private: bool _has_owner_view; bool _handle_c_updates; bool _client_datagram; + bool _handle_datagrams_internally; bool _simulated_disconnect; bool _verbose; diff --git a/direct/src/distributed/config_distributed.cxx b/direct/src/distributed/config_distributed.cxx index 849b284443..93fe7a4a8e 100644 --- a/direct/src/distributed/config_distributed.cxx +++ b/direct/src/distributed/config_distributed.cxx @@ -40,6 +40,13 @@ ConfigVariableDouble max_lag "inbound messages. It is useful to test a game's tolerance of " "network latency.")); +ConfigVariableBool handle_datagrams_internally +("handle-datagrams-internally", true, + PRC_DESC("When this is true, certain datagram types can be handled " + "directly by the C++ cConnectionRepository implementation, " + "for performance reasons. When it is false, all datagrams " + "are handled by the Python implementation.")); + //////////////////////////////////////////////////////////////////// // Function: init_libdistributed // Description: Initializes the library. This must be called at diff --git a/direct/src/distributed/config_distributed.h b/direct/src/distributed/config_distributed.h index cd094c5d62..a0d4446911 100644 --- a/direct/src/distributed/config_distributed.h +++ b/direct/src/distributed/config_distributed.h @@ -20,12 +20,14 @@ #include "dconfig.h" #include "configVariableInt.h" #include "configVariableDouble.h" +#include "configVariableBool.h" NotifyCategoryDecl(distributed, EXPCL_DIRECT, EXPTP_DIRECT); extern ConfigVariableInt game_server_timeout_ms; extern ConfigVariableDouble min_lag; extern ConfigVariableDouble max_lag; +extern ConfigVariableBool handle_datagrams_internally; extern EXPCL_DIRECT void init_libdistributed(); diff --git a/direct/src/distributed/direct.dc b/direct/src/distributed/direct.dc new file mode 100644 index 0000000000..5fd6ba3023 --- /dev/null +++ b/direct/src/distributed/direct.dc @@ -0,0 +1,90 @@ +# This is a sample dc file for some of the classes defined within the +# direct source tree. It is suggested that you copy this file into +# your own project (or load it from the direct source tree) and build +# on it with your own dc file for your own classes. + +keyword broadcast; +keyword ram; +keyword p2p; + +from direct.distributed import DistributedObject/AI +from direct.distributed import TimeManager/AI +from direct.distributed import DistributedNode/AI +from direct.distributed import DistributedSmoothNode/AI + +struct BarrierData { + uint16 context; + string name; + uint32 avIds[]; +}; + +// The most fundamental class +dclass DistributedObject { + // These are used to support DistributedObjectAI.beginBarrier() and + // the matching DistributedObject.doneBarrier(). If you don't call + // these functions, you don't care about these distributed methods. + // (Actually, you probably don't care anyway.) + setBarrierData(BarrierData data[]) broadcast ram; + setBarrierReady(uint16 context); +}; + +dclass TimeManager: DistributedObject { + requestServerTime(uint8 context) p2p; + serverTime(uint8 context, int32 timestamp); +}; + +dclass DistributedNode: DistributedObject { + setX(int16 / 10) broadcast ram; + setY(int16 / 10) broadcast ram; + setZ(int16 / 10) broadcast ram; + setH(int16 % 360 / 10) broadcast ram; + setP(int16 % 360 / 10) broadcast ram; + setR(int16 % 360 / 10) broadcast ram; + + setPos: setX, setY, setZ; + setHpr: setH, setP, setR; + setPosHpr: setX, setY, setZ, setH, setP, setR; + setXY: setX, setY; + setXZ: setX, setZ; + setXYH: setX, setY, setH; + setXYZH: setX, setY, setZ, setH; +}; + +dclass DistributedSmoothNode: DistributedNode { + // Component set pos and hpr functions. + + setComponentL(uint64) broadcast ram; + setComponentX(int16 / 10) broadcast ram; + setComponentY(int16 / 10) broadcast ram; + setComponentZ(int16 / 10) broadcast ram; + setComponentH(int16 % 360 / 10) broadcast ram; + setComponentP(int16 % 360 / 10) broadcast ram; + setComponentR(int16 % 360 / 10) broadcast ram; + setComponentT(int16 timestamp) broadcast ram; + + // Composite set pos and hpr functions. These map to combinations + // of one or more of the above components. They all include + // setComponentT(), which must be called last. + setSmStop: setComponentT; + setSmH: setComponentH, setComponentT; + setSmZ: setComponentZ, setComponentT; + setSmXY: setComponentX, setComponentY, setComponentT; + setSmXZ: setComponentX, setComponentZ, setComponentT; + setSmPos: setComponentX, setComponentY, setComponentZ, setComponentT; + setSmHpr: setComponentH, setComponentP, setComponentR, setComponentT; + setSmXYH: setComponentX, setComponentY, setComponentH, setComponentT; + setSmXYZH: setComponentX, setComponentY, setComponentZ, setComponentH, setComponentT; + setSmPosHpr: setComponentX, setComponentY, setComponentZ, setComponentH, setComponentP, setComponentR, setComponentT; + // special update if L (being location, such as zoneId) changes, send everything, intended to + // keep position and 'location' in sync + setSmPosHprL: setComponentL, setComponentX, setComponentY, setComponentZ, setComponentH, setComponentP, setComponentR, setComponentT; + + clearSmoothing(int8 bogus) broadcast; + + suggestResync(uint32 avId, int16 timestampA, int16 timestampB, + int32 serverTimeSec, uint16 serverTimeUSec, + uint16 / 100 uncertainty); + returnResync(uint32 avId, int16 timestampB, + int32 serverTimeSec, uint16 serverTimeUSec, + uint16 / 100 uncertainty); +};