From d3f1ad07a51752f5f43452bbfb73b5cb21d88194 Mon Sep 17 00:00:00 2001 From: Mark Mine Date: Wed, 24 Apr 2002 01:06:41 +0000 Subject: [PATCH] Use new direct daemon --- direct/src/cluster/ClusterClient.py | 85 +++++++---- direct/src/cluster/ClusterConfig.py | 5 +- direct/src/cluster/ClusterMsgs.py | 12 +- direct/src/cluster/ClusterServer.py | 133 ++++++++++-------- direct/src/directtools/DirectCameraControl.py | 1 - direct/src/directtools/DirectSession.py | 10 +- 6 files changed, 155 insertions(+), 91 deletions(-) diff --git a/direct/src/cluster/ClusterClient.py b/direct/src/cluster/ClusterClient.py index 95bff62df2..f8fefca7c1 100644 --- a/direct/src/cluster/ClusterClient.py +++ b/direct/src/cluster/ClusterClient.py @@ -11,18 +11,40 @@ class ClusterClient(DirectObject.DirectObject): notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient") MGR_NUM = 1000000 - def __init__(self, configList): + def __init__(self, configList, clusterSyncFlag): + # First start up servers using direct daemon + clusterDaemonClient = base.config.GetString( + 'cluster-daemon-client', 'localhost') + clusterDaemonPort = base.config.GetInt( + 'cluster-daemon-port', CLUSTER_DAEMON_PORT) + self.daemon = DirectD() + print 'LISTEN' + self.daemon.listenTo(clusterDaemonPort) + for serverConfig in configList: + serverCommand = (SERVER_STARTUP_STRING % + (serverConfig.serverPort, + clusterSyncFlag, + clusterDaemonClient, + clusterDaemonPort)) + print 'BOOTSTRAP', serverCommand + self.daemon.clientReady(serverConfig.serverName, + clusterDaemonPort, + serverCommand) + print 'WAITING' + if not self.daemon.waitForServers(len(configList)): + print 'ERROR' + print 'DONE' self.qcm=QueuedConnectionManager() self.serverList = [] self.msgHandler = ClusterMsgHandler(ClusterClient.MGR_NUM, self.notify) for serverConfig in configList: server = DisplayConnection(self.qcm,serverConfig.serverName, - serverConfig.port,self.msgHandler) + serverConfig.serverPort,self.msgHandler) if server == None: self.notify.error('Could not open %s on %s port %d' % (serverConfig.serverConfigName, serverConfig.serverName, - serverConfig.port)) + serverConfig.serverPort)) else: server.sendCamOffset(serverConfig.xyz,serverConfig.hpr) if serverConfig.fFrustum: @@ -98,10 +120,10 @@ class ClusterClient(DirectObject.DirectObject): import sys sys.exit() -class ClusterClientSync(ClusterClient): - def __init__(self, configList): - ClusterClient.__init__(self, configList) +class ClusterClientSync(ClusterClient): + def __init__(self, configList, clusterSyncFlag): + ClusterClient.__init__(self, configList, clusterSyncFlag) #I probably don't need this self.waitForSwap = 0 self.ready = 0 @@ -135,6 +157,7 @@ class ClusterClientSync(ClusterClient): self.notify.debug('moving synced camera') ClusterClient.moveCamera(self,xyz,hpr) self.waitForSwap=1 + class DisplayConnection: def __init__(self,qcm,serverName,port,msgHandler): @@ -220,10 +243,10 @@ class DisplayConnection: self.cw.send(datagram, self.tcpConn) class ClusterConfigItem: - def __init__(self, serverConfigName, serverName, port): + def __init__(self, serverConfigName, serverName, serverPort): self.serverConfigName = serverConfigName self.serverName = serverName - self.port = port + self.serverPort = serverPort # Camera Offset self.xyz = Vec3(0) self.hpr = Vec3(0) @@ -241,6 +264,7 @@ class ClusterConfigItem: self.filmSize = filmSize self.filmOffset = filmOffset + def createClusterClient(): # setup camera offsets based on cluster-config clusterConfig = base.config.GetString('cluster-config', 'single-server') @@ -269,29 +293,29 @@ def createClusterClient(): lens = base.cam.node().getLens() lens.setViewHpr(hpr) lens.setIodOffset(pos[0]) - lens.setFocalLength(fl) - lens.setFilmSize(fs[0], fs[1]) - lens.setFilmOffset(fo[0], fo[1]) + if fl is not None: + lens.setFocalLength(fl) + if fs is not None: + lens.setFilmSize(fs[0], fs[1]) + if fo is not None: + lens.setFilmOffset(fo[0], fo[1]) else: serverConfigName = 'cluster-server-%s' % displayName - serverString = base.config.GetString(serverConfigName, '') - if serverString == '': + serverName = base.config.GetString(serverConfigName, '') + if serverName == '': base.notify.warning( '%s undefined in Configrc: expected by %s display client.'% (serverConfigName,clusterConfig)) base.notify.warning('%s will not be used.' % serverConfigName) else: - serverInfo = string.split(serverString) - serverName = serverInfo[0] - if len(serverInfo) > 1: - port = int(serverInfo[1]) - else: - # Use default port - port = CLUSTER_PORT + # Server port + serverPortConfigName = 'cluster-server-port-%s' % displayName + serverPort = base.config.GetInt(serverPortConfigName, + CLUSTER_SERVER_PORT) cci = ClusterConfigItem( serverConfigName, serverName, - port) + serverPort) # Init cam offset cci.setCamOffset(pos, hpr) # Init frustum if specified @@ -300,10 +324,23 @@ def createClusterClient(): displayConfigs.append(cci) # Create Cluster Managers (opening connections to servers) # Are the servers going to be synced? - if base.config.GetBool('cluster-sync', 0): + clusterSyncFlag = base.config.GetBool('cluster-sync', 0) + if clusterSyncFlag: base.win.setSync(1) - return ClusterClientSync(displayConfigs) + return ClusterClientSync(displayConfigs, clusterSyncFlag) else: - return ClusterClient(displayConfigs) + return ClusterClient(displayConfigs, clusterSyncFlag) +class DummyClusterClient(DirectObject.DirectObject): + """ Dummy class to handle command strings when not in cluster mode """ + notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient") + def __init__(self): + pass + + def cmd(self, commandString, fLocally = 1): + if fLocally: + # Execute locally + exec( commandString, globals() ) + + diff --git a/direct/src/cluster/ClusterConfig.py b/direct/src/cluster/ClusterConfig.py index e3b67d4e06..7fea169d0d 100644 --- a/direct/src/cluster/ClusterConfig.py +++ b/direct/src/cluster/ClusterConfig.py @@ -25,11 +25,12 @@ ClientConfigs = { 'hpr' : Vec3(0)} ], 'two-server' : [{'display name' : 'display0', + 'display mode' : 'client', 'pos' : Vec3(0), - 'hpr' : Vec3(-60,0,0)}, + 'hpr' : Vec3(-30,0,0)}, {'display name' : 'display1', 'pos' : Vec3(0), - 'hpr' : Vec3(60,0,0)} + 'hpr' : Vec3(30,0,0)} ], 'mono-modelcave-pipe0': [{'display name' : 'display0', 'pos' : Vec3(0), diff --git a/direct/src/cluster/ClusterMsgs.py b/direct/src/cluster/ClusterMsgs.py index fc8b3cda6b..b883e8ae5c 100644 --- a/direct/src/cluster/ClusterMsgs.py +++ b/direct/src/cluster/ClusterMsgs.py @@ -19,7 +19,17 @@ CLUSTER_SELECTED_MOVEMENT = 7 CLUSTER_EXIT = 100 #Port number for cluster rendering -CLUSTER_PORT = 1970 +CLUSTER_SERVER_PORT = 1970 +CLUSTER_DAEMON_PORT = 8001 + +SERVER_STARTUP_STRING = ( + 'bash ppython -c ' + + '"import __builtin__; ' + + '__builtin__.clusterServerPort = %s;' + + '__builtin__.clusterSyncFlag = %d;' + + '__builtin__.clusterDaemonClient = \'%s\';' + + '__builtin__.clusterDaemonPort = %d;' + 'from ShowBaseGlobal import *"') class ClusterMsgHandler: """ClusterMsgHandler: wrapper for PC clusters/multi-piping networking""" diff --git a/direct/src/cluster/ClusterServer.py b/direct/src/cluster/ClusterServer.py index 623e14e1e2..f1e9a3c9c0 100644 --- a/direct/src/cluster/ClusterServer.py +++ b/direct/src/cluster/ClusterServer.py @@ -7,6 +7,12 @@ import DirectNotifyGlobal import DirectObject import Task +# NOTE: This assumes the following variables are set via bootstrap command line +# arguments on server startup: +# clusterServerPort +# clusterSyncFlag +# clusterDaemonClient +# clusterDaemonPort # Also, I'm not sure multiple camera-group configurations are working for the # cluster system. @@ -15,6 +21,9 @@ class ClusterServer(DirectObject.DirectObject): MSG_NUM = 2000000 def __init__(self,cameraJig,camera): + import pdb + pdb.set_trace() + print clusterServerPort, clusterSyncFlag, clusterDaemonClient, clusterDaemonPort # Store information about the cluster's camera self.cameraJig = cameraJig self.camera = camera @@ -26,13 +35,39 @@ class ClusterServer(DirectObject.DirectObject): self.qcl = QueuedConnectionListener(self.qcm, 0) self.qcr = QueuedConnectionReader(self.qcm, 0) self.cw = ConnectionWriter(self.qcm,0) - port = base.config.GetInt('cluster-server-port', CLUSTER_PORT) + try: + port = clusterServerPort + except NameError: + port = CLUSTER_SERVER_PORT self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1) self.qcl.addConnection(self.tcpRendezvous) self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify) # Start cluster tasks self.startListenerPollTask() self.startReaderPollTask() + # If synchronized server, start swap coordinator too + try: + clusterSyncFlag + except NameError: + clusterSyncFlag = 0 + if clusterSyncFlag: + self.startSwapCoordinator() + base.win.setSync(1) + print 'DAEMON' + # Send verification of startup to client + self.daemon = DirectD() + # These must be passed in as bootstrap arguments and stored in + # the __builtin__ namespace + try: + clusterDaemonClient + except NameError: + clusterDaemonClient = 'localhost' + try: + clusterDaemonPort + except NameError: + clusterDaemonPort = CLUSTER_DAEMON_PORT + print 'SERVER READY' + self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort) def startListenerPollTask(self): # Run this task near the start of frame, sometime after the dataloop @@ -59,9 +94,14 @@ class ClusterServer(DirectObject.DirectObject): def startReaderPollTask(self): """ Task to handle datagrams from client """ # Run this task just after the listener poll task - taskMgr.add(self.readerPollTask, "serverReaderPollTask", -39) + if clusterSyncFlag: + # Sync version + taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39) + else: + # Asynchronous version + taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39) - def readerPollTask(self, state): + def _readerPollTask(self, state): """ Non blocking task to read all available datagrams """ while 1: (datagram, dgi,type) = self.msgHandler.nonBlockingRead(self.qcr) @@ -73,6 +113,41 @@ class ClusterServer(DirectObject.DirectObject): self.handleDatagram(dgi, type) return Task.cont + def _syncReaderPollTask(self, task): + if self.lastConnection is None: + pass + elif self.qcr.isConnectionOk(self.lastConnection): + # Process datagrams till you get a postion update + type = CLUSTER_NONE + while type != CLUSTER_CAM_MOVEMENT: + # Block until you get a new datagram + (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr) + # Process datagram + self.handleDatagram(dgi,type) + return Task.cont + + def startSwapCoordinator(self): + taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51) + + def swapCoordinatorTask(self, task): + if self.fPosReceived: + self.fPosReceived = 0 + # Alert client that this server is ready to swap + self.sendSwapReady() + # Wait for swap command (processing any intermediate datagrams) + while 1: + (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr) + self.handleDatagram(dgi,type) + if type == CLUSTER_SWAP_NOW: + break + return Task.cont + + def sendSwapReady(self): + self.notify.debug( + 'send swap ready packet %d' % self.msgHandler.packetNumber) + datagram = self.msgHandler.makeSwapReadyDatagram() + self.cw.send(datagram, self.lastConnection) + def handleDatagram(self, dgi, type): """ Process a datagram depending upon type flag """ if (type == CLUSTER_NONE): @@ -130,55 +205,3 @@ class ClusterServer(DirectObject.DirectObject): command = self.msgHandler.parseCommandStringDatagram(dgi) exec( command, globals() ) -class ClusterServerSync(ClusterServer): - - def __init__(self,cameraJig,camera): - self.notify.info('starting ClusterServerSync') - ClusterServer.__init__(self,cameraJig,camera) - self.startSwapCoordinator() - - def readerPollTask(self, task): - if self.lastConnection is None: - pass - elif self.qcr.isConnectionOk(self.lastConnection): - # Process datagrams till you get a postion update - type = CLUSTER_NONE - while type != CLUSTER_CAM_MOVEMENT: - # Block until you get a new datagram - (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr) - # Process datagram - self.handleDatagram(dgi,type) - return Task.cont - - def sendSwapReady(self): - self.notify.debug( - 'send swap ready packet %d' % self.msgHandler.packetNumber) - datagram = self.msgHandler.makeSwapReadyDatagram() - self.cw.send(datagram, self.lastConnection) - - def startSwapCoordinator(self): - taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51) - - def swapCoordinatorTask(self, task): - if self.fPosReceived: - self.fPosReceived = 0 - # Alert client that this server is ready to swap - self.sendSwapReady() - # Wait for swap command (processing any intermediate datagrams) - while 1: - (datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr) - self.handleDatagram(dgi,type) - if type == CLUSTER_SWAP_NOW: - break - return Task.cont - - - - - - - - - - - diff --git a/direct/src/directtools/DirectCameraControl.py b/direct/src/directtools/DirectCameraControl.py index 5d551fbc4a..bd2bbbeaf3 100644 --- a/direct/src/directtools/DirectCameraControl.py +++ b/direct/src/directtools/DirectCameraControl.py @@ -259,7 +259,6 @@ class DirectCameraControl(PandaObject): def mouseRollTask(self, state): wrtMat = state.wrtMat - print wrtMat angle = getCrankAngle(state.coaCenter) deltaAngle = angle - state.lastAngle state.lastAngle = angle diff --git a/direct/src/directtools/DirectSession.py b/direct/src/directtools/DirectSession.py index f63a7da10a..b2bd0cdc5c 100644 --- a/direct/src/directtools/DirectSession.py +++ b/direct/src/directtools/DirectSession.py @@ -170,15 +170,9 @@ class DirectSession(PandaObject): if self.clusterMode == 'client': self.cluster = createClusterClient() elif self.clusterMode == 'server': - if base.config.GetBool('cluster-sync',0): - self.cluster = ClusterServerSync(base.cameraList[0], - base.camList[0]) - base.win.setSync(1) - else: - self.cluster = ClusterServer(base.cameraList[0], - base.camList[0]) + self.cluster = ClusterServer(base.cameraList[0], base.camList[0]) else: - self.cluster = None + self.cluster = DummyClusterClient() def enable(self): # Make sure old tasks are shut down