Use new direct daemon

This commit is contained in:
Mark Mine 2002-04-24 01:06:41 +00:00
parent 626acdc658
commit d3f1ad07a5
6 changed files with 155 additions and 91 deletions

View File

@ -11,18 +11,40 @@ class ClusterClient(DirectObject.DirectObject):
notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient") notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
MGR_NUM = 1000000 MGR_NUM = 1000000
def __init__(self, configList): def __init__(self, configList, clusterSyncFlag):
# First start up servers using direct daemon
clusterDaemonClient = base.config.GetString(
'cluster-daemon-client', 'localhost')
clusterDaemonPort = base.config.GetInt(
'cluster-daemon-port', CLUSTER_DAEMON_PORT)
self.daemon = DirectD()
print 'LISTEN'
self.daemon.listenTo(clusterDaemonPort)
for serverConfig in configList:
serverCommand = (SERVER_STARTUP_STRING %
(serverConfig.serverPort,
clusterSyncFlag,
clusterDaemonClient,
clusterDaemonPort))
print 'BOOTSTRAP', serverCommand
self.daemon.clientReady(serverConfig.serverName,
clusterDaemonPort,
serverCommand)
print 'WAITING'
if not self.daemon.waitForServers(len(configList)):
print 'ERROR'
print 'DONE'
self.qcm=QueuedConnectionManager() self.qcm=QueuedConnectionManager()
self.serverList = [] self.serverList = []
self.msgHandler = ClusterMsgHandler(ClusterClient.MGR_NUM, self.notify) self.msgHandler = ClusterMsgHandler(ClusterClient.MGR_NUM, self.notify)
for serverConfig in configList: for serverConfig in configList:
server = DisplayConnection(self.qcm,serverConfig.serverName, server = DisplayConnection(self.qcm,serverConfig.serverName,
serverConfig.port,self.msgHandler) serverConfig.serverPort,self.msgHandler)
if server == None: if server == None:
self.notify.error('Could not open %s on %s port %d' % self.notify.error('Could not open %s on %s port %d' %
(serverConfig.serverConfigName, (serverConfig.serverConfigName,
serverConfig.serverName, serverConfig.serverName,
serverConfig.port)) serverConfig.serverPort))
else: else:
server.sendCamOffset(serverConfig.xyz,serverConfig.hpr) server.sendCamOffset(serverConfig.xyz,serverConfig.hpr)
if serverConfig.fFrustum: if serverConfig.fFrustum:
@ -98,10 +120,10 @@ class ClusterClient(DirectObject.DirectObject):
import sys import sys
sys.exit() sys.exit()
class ClusterClientSync(ClusterClient):
def __init__(self, configList): class ClusterClientSync(ClusterClient):
ClusterClient.__init__(self, configList) def __init__(self, configList, clusterSyncFlag):
ClusterClient.__init__(self, configList, clusterSyncFlag)
#I probably don't need this #I probably don't need this
self.waitForSwap = 0 self.waitForSwap = 0
self.ready = 0 self.ready = 0
@ -135,6 +157,7 @@ class ClusterClientSync(ClusterClient):
self.notify.debug('moving synced camera') self.notify.debug('moving synced camera')
ClusterClient.moveCamera(self,xyz,hpr) ClusterClient.moveCamera(self,xyz,hpr)
self.waitForSwap=1 self.waitForSwap=1
class DisplayConnection: class DisplayConnection:
def __init__(self,qcm,serverName,port,msgHandler): def __init__(self,qcm,serverName,port,msgHandler):
@ -220,10 +243,10 @@ class DisplayConnection:
self.cw.send(datagram, self.tcpConn) self.cw.send(datagram, self.tcpConn)
class ClusterConfigItem: class ClusterConfigItem:
def __init__(self, serverConfigName, serverName, port): def __init__(self, serverConfigName, serverName, serverPort):
self.serverConfigName = serverConfigName self.serverConfigName = serverConfigName
self.serverName = serverName self.serverName = serverName
self.port = port self.serverPort = serverPort
# Camera Offset # Camera Offset
self.xyz = Vec3(0) self.xyz = Vec3(0)
self.hpr = Vec3(0) self.hpr = Vec3(0)
@ -241,6 +264,7 @@ class ClusterConfigItem:
self.filmSize = filmSize self.filmSize = filmSize
self.filmOffset = filmOffset self.filmOffset = filmOffset
def createClusterClient(): def createClusterClient():
# setup camera offsets based on cluster-config # setup camera offsets based on cluster-config
clusterConfig = base.config.GetString('cluster-config', 'single-server') clusterConfig = base.config.GetString('cluster-config', 'single-server')
@ -269,29 +293,29 @@ def createClusterClient():
lens = base.cam.node().getLens() lens = base.cam.node().getLens()
lens.setViewHpr(hpr) lens.setViewHpr(hpr)
lens.setIodOffset(pos[0]) lens.setIodOffset(pos[0])
lens.setFocalLength(fl) if fl is not None:
lens.setFilmSize(fs[0], fs[1]) lens.setFocalLength(fl)
lens.setFilmOffset(fo[0], fo[1]) if fs is not None:
lens.setFilmSize(fs[0], fs[1])
if fo is not None:
lens.setFilmOffset(fo[0], fo[1])
else: else:
serverConfigName = 'cluster-server-%s' % displayName serverConfigName = 'cluster-server-%s' % displayName
serverString = base.config.GetString(serverConfigName, '') serverName = base.config.GetString(serverConfigName, '')
if serverString == '': if serverName == '':
base.notify.warning( base.notify.warning(
'%s undefined in Configrc: expected by %s display client.'% '%s undefined in Configrc: expected by %s display client.'%
(serverConfigName,clusterConfig)) (serverConfigName,clusterConfig))
base.notify.warning('%s will not be used.' % serverConfigName) base.notify.warning('%s will not be used.' % serverConfigName)
else: else:
serverInfo = string.split(serverString) # Server port
serverName = serverInfo[0] serverPortConfigName = 'cluster-server-port-%s' % displayName
if len(serverInfo) > 1: serverPort = base.config.GetInt(serverPortConfigName,
port = int(serverInfo[1]) CLUSTER_SERVER_PORT)
else:
# Use default port
port = CLUSTER_PORT
cci = ClusterConfigItem( cci = ClusterConfigItem(
serverConfigName, serverConfigName,
serverName, serverName,
port) serverPort)
# Init cam offset # Init cam offset
cci.setCamOffset(pos, hpr) cci.setCamOffset(pos, hpr)
# Init frustum if specified # Init frustum if specified
@ -300,10 +324,23 @@ def createClusterClient():
displayConfigs.append(cci) displayConfigs.append(cci)
# Create Cluster Managers (opening connections to servers) # Create Cluster Managers (opening connections to servers)
# Are the servers going to be synced? # Are the servers going to be synced?
if base.config.GetBool('cluster-sync', 0): clusterSyncFlag = base.config.GetBool('cluster-sync', 0)
if clusterSyncFlag:
base.win.setSync(1) base.win.setSync(1)
return ClusterClientSync(displayConfigs) return ClusterClientSync(displayConfigs, clusterSyncFlag)
else: else:
return ClusterClient(displayConfigs) return ClusterClient(displayConfigs, clusterSyncFlag)
class DummyClusterClient(DirectObject.DirectObject):
""" Dummy class to handle command strings when not in cluster mode """
notify = DirectNotifyGlobal.directNotify.newCategory("ClusterClient")
def __init__(self):
pass
def cmd(self, commandString, fLocally = 1):
if fLocally:
# Execute locally
exec( commandString, globals() )

View File

@ -25,11 +25,12 @@ ClientConfigs = {
'hpr' : Vec3(0)} 'hpr' : Vec3(0)}
], ],
'two-server' : [{'display name' : 'display0', 'two-server' : [{'display name' : 'display0',
'display mode' : 'client',
'pos' : Vec3(0), 'pos' : Vec3(0),
'hpr' : Vec3(-60,0,0)}, 'hpr' : Vec3(-30,0,0)},
{'display name' : 'display1', {'display name' : 'display1',
'pos' : Vec3(0), 'pos' : Vec3(0),
'hpr' : Vec3(60,0,0)} 'hpr' : Vec3(30,0,0)}
], ],
'mono-modelcave-pipe0': [{'display name' : 'display0', 'mono-modelcave-pipe0': [{'display name' : 'display0',
'pos' : Vec3(0), 'pos' : Vec3(0),

View File

@ -19,7 +19,17 @@ CLUSTER_SELECTED_MOVEMENT = 7
CLUSTER_EXIT = 100 CLUSTER_EXIT = 100
#Port number for cluster rendering #Port number for cluster rendering
CLUSTER_PORT = 1970 CLUSTER_SERVER_PORT = 1970
CLUSTER_DAEMON_PORT = 8001
SERVER_STARTUP_STRING = (
'bash ppython -c ' +
'"import __builtin__; ' +
'__builtin__.clusterServerPort = %s;' +
'__builtin__.clusterSyncFlag = %d;' +
'__builtin__.clusterDaemonClient = \'%s\';' +
'__builtin__.clusterDaemonPort = %d;'
'from ShowBaseGlobal import *"')
class ClusterMsgHandler: class ClusterMsgHandler:
"""ClusterMsgHandler: wrapper for PC clusters/multi-piping networking""" """ClusterMsgHandler: wrapper for PC clusters/multi-piping networking"""

View File

@ -7,6 +7,12 @@ import DirectNotifyGlobal
import DirectObject import DirectObject
import Task import Task
# NOTE: This assumes the following variables are set via bootstrap command line
# arguments on server startup:
# clusterServerPort
# clusterSyncFlag
# clusterDaemonClient
# clusterDaemonPort
# Also, I'm not sure multiple camera-group configurations are working for the # Also, I'm not sure multiple camera-group configurations are working for the
# cluster system. # cluster system.
@ -15,6 +21,9 @@ class ClusterServer(DirectObject.DirectObject):
MSG_NUM = 2000000 MSG_NUM = 2000000
def __init__(self,cameraJig,camera): def __init__(self,cameraJig,camera):
import pdb
pdb.set_trace()
print clusterServerPort, clusterSyncFlag, clusterDaemonClient, clusterDaemonPort
# Store information about the cluster's camera # Store information about the cluster's camera
self.cameraJig = cameraJig self.cameraJig = cameraJig
self.camera = camera self.camera = camera
@ -26,13 +35,39 @@ class ClusterServer(DirectObject.DirectObject):
self.qcl = QueuedConnectionListener(self.qcm, 0) self.qcl = QueuedConnectionListener(self.qcm, 0)
self.qcr = QueuedConnectionReader(self.qcm, 0) self.qcr = QueuedConnectionReader(self.qcm, 0)
self.cw = ConnectionWriter(self.qcm,0) self.cw = ConnectionWriter(self.qcm,0)
port = base.config.GetInt('cluster-server-port', CLUSTER_PORT) try:
port = clusterServerPort
except NameError:
port = CLUSTER_SERVER_PORT
self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1) self.tcpRendezvous = self.qcm.openTCPServerRendezvous(port, 1)
self.qcl.addConnection(self.tcpRendezvous) self.qcl.addConnection(self.tcpRendezvous)
self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify) self.msgHandler = ClusterMsgHandler(ClusterServer.MSG_NUM, self.notify)
# Start cluster tasks # Start cluster tasks
self.startListenerPollTask() self.startListenerPollTask()
self.startReaderPollTask() self.startReaderPollTask()
# If synchronized server, start swap coordinator too
try:
clusterSyncFlag
except NameError:
clusterSyncFlag = 0
if clusterSyncFlag:
self.startSwapCoordinator()
base.win.setSync(1)
print 'DAEMON'
# Send verification of startup to client
self.daemon = DirectD()
# These must be passed in as bootstrap arguments and stored in
# the __builtin__ namespace
try:
clusterDaemonClient
except NameError:
clusterDaemonClient = 'localhost'
try:
clusterDaemonPort
except NameError:
clusterDaemonPort = CLUSTER_DAEMON_PORT
print 'SERVER READY'
self.daemon.serverReady(clusterDaemonClient, clusterDaemonPort)
def startListenerPollTask(self): def startListenerPollTask(self):
# Run this task near the start of frame, sometime after the dataloop # Run this task near the start of frame, sometime after the dataloop
@ -59,9 +94,14 @@ class ClusterServer(DirectObject.DirectObject):
def startReaderPollTask(self): def startReaderPollTask(self):
""" Task to handle datagrams from client """ """ Task to handle datagrams from client """
# Run this task just after the listener poll task # Run this task just after the listener poll task
taskMgr.add(self.readerPollTask, "serverReaderPollTask", -39) if clusterSyncFlag:
# Sync version
taskMgr.add(self._syncReaderPollTask, "serverReaderPollTask", -39)
else:
# Asynchronous version
taskMgr.add(self._readerPollTask, "serverReaderPollTask", -39)
def readerPollTask(self, state): def _readerPollTask(self, state):
""" Non blocking task to read all available datagrams """ """ Non blocking task to read all available datagrams """
while 1: while 1:
(datagram, dgi,type) = self.msgHandler.nonBlockingRead(self.qcr) (datagram, dgi,type) = self.msgHandler.nonBlockingRead(self.qcr)
@ -73,6 +113,41 @@ class ClusterServer(DirectObject.DirectObject):
self.handleDatagram(dgi, type) self.handleDatagram(dgi, type)
return Task.cont return Task.cont
def _syncReaderPollTask(self, task):
if self.lastConnection is None:
pass
elif self.qcr.isConnectionOk(self.lastConnection):
# Process datagrams till you get a postion update
type = CLUSTER_NONE
while type != CLUSTER_CAM_MOVEMENT:
# Block until you get a new datagram
(datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
# Process datagram
self.handleDatagram(dgi,type)
return Task.cont
def startSwapCoordinator(self):
taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
def swapCoordinatorTask(self, task):
if self.fPosReceived:
self.fPosReceived = 0
# Alert client that this server is ready to swap
self.sendSwapReady()
# Wait for swap command (processing any intermediate datagrams)
while 1:
(datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
self.handleDatagram(dgi,type)
if type == CLUSTER_SWAP_NOW:
break
return Task.cont
def sendSwapReady(self):
self.notify.debug(
'send swap ready packet %d' % self.msgHandler.packetNumber)
datagram = self.msgHandler.makeSwapReadyDatagram()
self.cw.send(datagram, self.lastConnection)
def handleDatagram(self, dgi, type): def handleDatagram(self, dgi, type):
""" Process a datagram depending upon type flag """ """ Process a datagram depending upon type flag """
if (type == CLUSTER_NONE): if (type == CLUSTER_NONE):
@ -130,55 +205,3 @@ class ClusterServer(DirectObject.DirectObject):
command = self.msgHandler.parseCommandStringDatagram(dgi) command = self.msgHandler.parseCommandStringDatagram(dgi)
exec( command, globals() ) exec( command, globals() )
class ClusterServerSync(ClusterServer):
def __init__(self,cameraJig,camera):
self.notify.info('starting ClusterServerSync')
ClusterServer.__init__(self,cameraJig,camera)
self.startSwapCoordinator()
def readerPollTask(self, task):
if self.lastConnection is None:
pass
elif self.qcr.isConnectionOk(self.lastConnection):
# Process datagrams till you get a postion update
type = CLUSTER_NONE
while type != CLUSTER_CAM_MOVEMENT:
# Block until you get a new datagram
(datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
# Process datagram
self.handleDatagram(dgi,type)
return Task.cont
def sendSwapReady(self):
self.notify.debug(
'send swap ready packet %d' % self.msgHandler.packetNumber)
datagram = self.msgHandler.makeSwapReadyDatagram()
self.cw.send(datagram, self.lastConnection)
def startSwapCoordinator(self):
taskMgr.add(self.swapCoordinatorTask, "serverSwapCoordinator", 51)
def swapCoordinatorTask(self, task):
if self.fPosReceived:
self.fPosReceived = 0
# Alert client that this server is ready to swap
self.sendSwapReady()
# Wait for swap command (processing any intermediate datagrams)
while 1:
(datagram,dgi,type) = self.msgHandler.blockingRead(self.qcr)
self.handleDatagram(dgi,type)
if type == CLUSTER_SWAP_NOW:
break
return Task.cont

View File

@ -259,7 +259,6 @@ class DirectCameraControl(PandaObject):
def mouseRollTask(self, state): def mouseRollTask(self, state):
wrtMat = state.wrtMat wrtMat = state.wrtMat
print wrtMat
angle = getCrankAngle(state.coaCenter) angle = getCrankAngle(state.coaCenter)
deltaAngle = angle - state.lastAngle deltaAngle = angle - state.lastAngle
state.lastAngle = angle state.lastAngle = angle

View File

@ -170,15 +170,9 @@ class DirectSession(PandaObject):
if self.clusterMode == 'client': if self.clusterMode == 'client':
self.cluster = createClusterClient() self.cluster = createClusterClient()
elif self.clusterMode == 'server': elif self.clusterMode == 'server':
if base.config.GetBool('cluster-sync',0): self.cluster = ClusterServer(base.cameraList[0], base.camList[0])
self.cluster = ClusterServerSync(base.cameraList[0],
base.camList[0])
base.win.setSync(1)
else:
self.cluster = ClusterServer(base.cameraList[0],
base.camList[0])
else: else:
self.cluster = None self.cluster = DummyClusterClient()
def enable(self): def enable(self):
# Make sure old tasks are shut down # Make sure old tasks are shut down