diff --git a/direct/src/cluster/ClusterClient.py b/direct/src/cluster/ClusterClient.py index 971fa3154c..9721541a68 100644 --- a/direct/src/cluster/ClusterClient.py +++ b/direct/src/cluster/ClusterClient.py @@ -92,6 +92,11 @@ class DisplayConnection: datagram = self.msgHandler.makeSwapNowDatagram() self.cw.send(datagram, self.tcpConn) + def sendCommandString(self, commandString): + ClusterManager.notify.debug("send command string: %s" % commandString) + datagram = self.msgHandler.makeCommandStringDatagram(commandString) + self.cw.send(datagram, self.tcpConn) + class ClusterManager(DirectObject.DirectObject): notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient") MGR_NUM = 1000000 @@ -128,10 +133,16 @@ class ClusterManager(DirectObject.DirectObject): def moveCameraTask(self,task): self.moveCamera( - base.camera.getPos(render), - base.camera.getHpr(render)) + direct.camera.getPos(render), + direct.camera.getHpr(render)) return Task.cont + def clusterCommand(self, commandString): + # Execute remotely + for server in self.serverList: + server.sendCommandString(commandString) + # Execute locally + exec( commandString, globals() ) class ClusterManagerSync(ClusterManager): @@ -153,24 +164,17 @@ class ClusterManagerSync(ClusterManager): self.waitForSwap=0 self.notify.debug( "START get swaps----------------------------------") - localClock = ClockObject() - t1 = localClock.getRealTime() for server in self.serverList: server.getSwapReady() self.notify.debug( "----------------START swap now--------------------") - t2 = localClock.getRealTime() for server in self.serverList: server.sendSwapNow() self.notify.debug( "------------------------------START swap----------") - t3 = localClock.getRealTime() base.win.swap() - t4 = localClock.getRealTime() self.notify.debug( "------------------------------------------END swap") - self.notify.debug( ("times=%f %f %f %f" % (t1,t2,t3,t4)) ) - self.notify.debug( ("deltas=%f %f %f" % (t2-t1,t3-t2,t4-t3)) ) return Task.cont def moveCamera(self,xyz,hpr): diff --git a/direct/src/cluster/ClusterMsgs.py b/direct/src/cluster/ClusterMsgs.py index 9fd17eae4d..34ba60f2e0 100644 --- a/direct/src/cluster/ClusterMsgs.py +++ b/direct/src/cluster/ClusterMsgs.py @@ -14,6 +14,7 @@ CLUSTER_CAM_FRUSTUM = 2 CLUSTER_POS_UPDATE = 3 CLUSTER_SWAP_READY = 4 CLUSTER_SWAP_NOW = 5 +CLUSTER_COMMAND_STRING = 6 #Port number for cluster rendering CLUSTER_PORT = 1970 @@ -49,7 +50,6 @@ class MsgHandler: else: type = CLUSTER_NOTHING dgi = None - return (type,dgi) def readHeader(self,datagram): @@ -60,21 +60,16 @@ class MsgHandler: return (type,dgi) def blockingRead(self,qcr): - availGetVal = 0 - while not availGetVal: - availGetVal = qcr.dataAvailable() - if not availGetVal: - # The following may not be necessary. - # I just wanted some - # time given to the operating system while - # busy waiting. - time.sleep(0.002) - type = CLUSTER_NOTHING + while not qcr.dataAvailable(): + # The following may not be necessary. + # I just wanted some + # time given to the operating system while + # busy waiting. + time.sleep(0.002) datagram = NetDatagram() readRetVal = qcr.getData(datagram) if not readRetVal: self.notify.warning("getData returned false") - return datagram def makeCamOffsetDatagram(self,xyz,hpr): @@ -90,6 +85,14 @@ class MsgHandler: datagram.addFloat32(hpr[2]) return datagram + def makeCommandStringDatagram(self, commandString): + datagram = Datagram.Datagram() + datagram.addUint32(self.packetNumber) + self.packetNumber = self.packetNumber + 1 + datagram.addUint8(CLUSTER_COMMAND_STRING) + datagram.addString(commandString) + return datagram + def makeCamFrustumDatagram(self,focalLength, filmSize, filmOffset): datagram = Datagram.Datagram() datagram.addUint32(self.packetNumber) diff --git a/direct/src/cluster/ClusterServer.py b/direct/src/cluster/ClusterServer.py index b9959c00f4..e892bfef77 100644 --- a/direct/src/cluster/ClusterServer.py +++ b/direct/src/cluster/ClusterServer.py @@ -26,30 +26,33 @@ class ClusterServer(DirectObject.DirectObject): MSG_NUM = 2000000 def __init__(self,cameraGroup,camera): + # Store information about the cluster's camera + self.cameraGroup = cameraGroup + self.camera = camera + self.lens = camera.node().getLens() + # Initialize camera offsets + self.posOffset = Vec3(0,0,0) + self.hprOffset = Vec3(0,0,0) + # Create network layer objects + self.lastConnection = None self.qcm = QueuedConnectionManager() self.qcl = QueuedConnectionListener(self.qcm, 0) self.qcr = QueuedConnectionReader(self.qcm, 0) self.cw = ConnectionWriter(self.qcm,0) port = base.config.GetInt("cluster-server-port",CLUSTER_PORT) self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1) - print self.tcpRendezvous - self.cameraGroup = cameraGroup - self.camera = camera - self.lens = camera.node().getLens() self.qcl.addConnection(self.tcpRendezvous) self.msgHandler = MsgHandler(ClusterServer.MSG_NUM,self.notify) + # Start cluster tasks self.startListenerPollTask() self.startReaderPollTask() - self.posOffset = Vec3(0,0,0) - self.hprOffset = Vec3(0,0,0) - return None def startListenerPollTask(self): - task = Task.Task(self.listenerPoll) - taskMgr.add(task, "serverListenerPollTask") - return None + taskMgr.add(self.listenerPollTask, "serverListenerPollTask",-40) - def listenerPoll(self, task): + def listenerPollTask(self, task): + """ Task to listen for a new connection from the client """ + # Run this task after the dataloop if self.qcl.newConnectionAvailable(): print "New connection is available" rendezvous = PointerToConnection() @@ -69,26 +72,38 @@ class ClusterServer(DirectObject.DirectObject): return Task.cont def startReaderPollTask(self): - task = Task.Task(self.readerPollUntilEmpty,-10) - taskMgr.add(task, "serverReaderPollTask") - return None + """ Task to handle datagrams from client """ + # Run this task just after the listener poll task and dataloop + taskMgr.add(self.readerPollTask, "serverReaderPollTask", -39) - def readerPollUntilEmpty(self, task): - while self.readerPollOnce(): - pass - return Task.cont - - def readerPollOnce(self): - availGetVal = self.qcr.dataAvailable() - if availGetVal: + def readerPollTask(self): + while self.qcr.dataAvailable(): datagram = NetDatagram() readRetVal = self.qcr.getData(datagram) if readRetVal: self.handleDatagram(datagram) else: self.notify.warning("getData returned false") - return availGetVal + return Task.cont + def handleDatagram(self, datagram): + (type, dgi) = self.msgHandler.nonBlockingRead(self.qcr) + if type==CLUSTER_CAM_OFFSET: + self.handleCamOffset(dgi) + elif type==CLUSTER_CAM_FRUSTUM: + self.handleCamFrustum(dgi) + elif type==CLUSTER_POS_UPDATE: + self.handleCamMovement(dgi) + elif type==CLUSTER_SWAP_READY: + pass + elif type==CLUSTER_SWAP_NOW: + pass + elif type==CLUSTER_COMMAND_STRING: + self.handleCommandString(dgi) + else: + self.notify.warning("recieved unknown packet") + return type + def handleCamOffset(self,dgi): x=dgi.getFloat32() y=dgi.getFloat32() @@ -129,83 +144,40 @@ class ClusterServer(DirectObject.DirectObject): finalR = r + self.hprOffset[2] self.cameraGroup.setPosHpr(render,finalX,finalY,finalZ, finalH,finalP,finalR) + + def handleCommandString(self, dgi): + command = dgi.getString() + exec( command, globals() ) - def handleDatagram(self, datagram): - (type, dgi) = self.msgHandler.nonBlockingRead(self.qcr) - if type==CLUSTER_CAM_OFFSET: - self.handleCamOffset(dgi) - elif type==CLUSTER_CAM_FRUSTUM: - self.handleCamFrustum(dgi) - elif type==CLUSTER_POS_UPDATE: - self.handleCamMovement(dgi) - elif type==CLUSTER_SWAP_READY: - pass - elif type==CLUSTER_SWAP_NOW: - pass - else: - self.notify.warning("recieved unknown packet") - return type - class ClusterServerSync(ClusterServer): def __init__(self,cameraGroup,camera): self.notify.info('starting ClusterServerSync') - self.startReading = 0 self.posRecieved = 0 ClusterServer.__init__(self,cameraGroup,camera) self.startSwapCoordinator() return None - def startListenerPollTask(self): - task = Task.Task(self.listenerPoll,-2) - taskMgr.add(task, "serverListenerPollTask") - return None - - def listenerPoll(self, task): - if self.qcl.newConnectionAvailable(): - print "New connection is available" - rendezvous = PointerToConnection() - netAddress = NetAddress() - newConnection = PointerToConnection() - retVal = self.qcl.getNewConnection(rendezvous, netAddress, - newConnection) - if retVal: - # Crazy dereferencing - newConnection=newConnection.p() - self.qcr.addConnection(newConnection) - print "Got a connection!" - self.lastConnection = newConnection + def readerPollTask(self, task): + if self.lastConnection is None: + pass + elif self.qcr.isConnectionOk(self.lastConnection): + # Process datagrams till you get a postion update + type = CLUSTER_NOTHING + while type != CLUSTER_POS_UPDATE: datagram = self.msgHandler.blockingRead(self.qcr) (type,dgi) = self.msgHandler.readHeader(datagram) - if type==CLUSTER_CAM_OFFSET: + if type == CLUSTER_POS_UPDATE: + # Move camera + self.handleCamMovement(dgi) + # Set flag for swap coordinator + self.posRecieved = 1 + elif type == CLUSTER_CAM_OFFSET: + # Update camera offset self.handleCamOffset(dgi) - else: - self.notify.warning("Wanted cam offset, got something else") - self.startReading = 1 - # now that we have the offset read, can start reading - else: - self.notify.warning("getNewConnection returned false") - return Task.cont - - def startReaderPollTask(self): - task = Task.Task(self.readPos,-1) - taskMgr.add(task, "serverReadPosTask") - return None - - def readPos(self, task): - if self.startReading and self.qcr.isConnectionOk(self.lastConnection): - datagram = self.msgHandler.blockingRead(self.qcr) - (type,dgi) = self.msgHandler.readHeader(datagram) - if type == CLUSTER_POS_UPDATE: - self.posRecieved = 1 - self.handleCamMovement(dgi) - elif type == CLUSTER_CAM_OFFSET: - self.handleCamOffset(dgi) - else: - self.notify.warning('expected pos or orientation, instead got %d' % type) - else: - self.startReading = 0 # keep this 0 as long as connection not ok - + elif type == CLUSTER_COMMAND_STRING: + # Got a command, execute it + self.handleCommandString(dgi) return Task.cont def sendSwapReady(self): @@ -215,30 +187,18 @@ class ClusterServerSync(ClusterServer): self.cw.send(datagram, self.lastConnection) def startSwapCoordinator(self): - task = Task.Task(self.swapCoordinatorTask, 51) - taskMgr.add(task, "serverSwapCoordinator") + taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51) return None def swapCoordinatorTask(self, task): if self.posRecieved: self.posRecieved = 0 - localClock = ClockObject() -# print "START send-------------------------------" - t1 = localClock.getRealTime() self.sendSwapReady() -# print "-----------START read--------------------" - t2 = localClock.getRealTime() datagram = self.msgHandler.blockingRead(self.qcr) (type,dgi) = self.msgHandler.readHeader(datagram) if type == CLUSTER_SWAP_NOW: self.notify.debug('swapping') -# print "---------------------START SWAP----------" - t3 = localClock.getRealTime() base.win.swap() - t4 = localClock.getRealTime() -# print "---------------------------------END SWAP" -# print "times=",t1,t2,t3,t4 -# print "deltas=",t2-t1,t3-t2,t4-t3 else: self.notify.warning("did not get expected swap now") return Task.cont