split out common code from ClientRepository and AIRepository into ConnectionRepository

This commit is contained in:
David Rose 2003-07-05 20:02:32 +00:00
parent 38a95c373e
commit 7049173fb3
3 changed files with 319 additions and 291 deletions

View File

@ -1,25 +1,20 @@
"""ClientRepository module: contains the ClientRepository class"""
from PandaModules import *
from TaskManagerGlobal import *
from MsgTypes import *
from ShowBaseGlobal import *
import Task
import DirectNotifyGlobal
import ClientDistClass
import CRCache
# The repository must import all known types of Distributed Objects
#import DistributedObject
#import DistributedToon
import DirectObject
import ConnectionRepository
import PythonUtil
class ClientRepository(DirectObject.DirectObject):
class ClientRepository(ConnectionRepository.ConnectionRepository):
notify = DirectNotifyGlobal.directNotify.newCategory("ClientRepository")
TASK_PRIORITY = -30
def __init__(self, dcFileName):
ConnectionRepository.ConnectionRepository.__init__(self, base.config)
self.number2cdc={}
self.name2cdc={}
self.doId2do={}
@ -27,42 +22,10 @@ class ClientRepository(DirectObject.DirectObject):
self.parseDcFile(dcFileName)
self.cache=CRCache.CRCache()
self.serverDelta = 0
# Set this to 'http' to establish a connection to the server
# using the HTTPClient interface, which ultimately uses the
# OpenSSL socket library (even though SSL is not involved).
# This is not as robust a socket library as NSPR's, but the
# HTTPClient interface does a good job of negotiating the
# connection over an HTTP proxy if one is in use.
# Set it to 'nspr' to use Panda's net interface
# (e.g. QueuedConnectionManager, etc.) to establish the
# connection, which ultimately uses the NSPR socket library.
# This is a much better socket library, but it may be more
# than you need for most applications; and the proxy support
# is weak.
# Set it to 'default' to use the HTTPClient interface if a
# proxy is in place, but the NSPR interface if we don't have a
# proxy.
self.connectMethod = base.config.GetString('connect-method', 'default')
self.connectHttp = None
self.qcm = None
self.bootedIndex = None
self.bootedText = None
self.tcpConn = None
# Reader statistics
self.rsDatagramCount = 0
self.rsUpdateObjs = {}
self.rsLastUpdate = 0
self.rsDoReport = base.config.GetBool('reader-statistics', 0)
self.rsUpdateInterval = base.config.GetDouble('reader-statistics-interval', 10)
return None
def setServerDelta(self, delta):
"""
Indicates the approximate difference in seconds between the
@ -98,223 +61,6 @@ class ClientRepository(DirectObject.DirectObject):
self.name2cdc[dcClass.getName()]=clientDistClass
return None
def connect(self, serverList, allowProxy,
successCallback = None, successArgs = [],
failureCallback = None, failureArgs = []):
"""
Attempts to establish a connection to the server. May return
before the connection is established. The two callbacks
represent the two functions to call (and their arguments) on
success or failure, respectively. The failure callback also
gets one additional parameter, which will be passed in first:
the return status code giving reason for failure, if it is
known.
"""
hasProxy = 0
if allowProxy:
proxies = self.http.getProxiesForUrl(serverList[0])
hasProxy = (proxies != '')
if hasProxy:
self.notify.info("Connecting to gameserver via proxy: %s" % (proxies))
else:
self.notify.info("Connecting to gameserver directly (no proxy).");
if self.connectMethod == 'http':
self.connectHttp = 1
elif self.connectMethod == 'nspr':
self.connectHttp = 0
else:
self.connectHttp = (hasProxy or serverList[0].isSsl())
self.bootedIndex = None
self.bootedText = None
if self.connectHttp:
# In the HTTP case, we can't just iterate through the list
# of servers, because each server attempt requires
# spawning a request and then coming back later to check
# the success or failure. Instead, we start the ball
# rolling by calling the connect callback, which will call
# itself repeatedly until we establish a connection (or
# run out of servers).
ch = self.http.makeChannel(0)
# Temporary try..except for old Pandas.
try:
ch.setAllowProxy(allowProxy)
except:
pass
self.httpConnectCallback(ch, serverList, 0, hasProxy,
successCallback, successArgs,
failureCallback, failureArgs)
else:
if self.qcm == None:
self.qcm = QueuedConnectionManager()
self.cw = ConnectionWriter(self.qcm, 0)
self.qcr = QueuedConnectionReader(self.qcm, 0)
minLag = config.GetFloat('min-lag', 0.)
maxLag = config.GetFloat('max-lag', 0.)
if minLag or maxLag:
self.qcr.startDelay(minLag, maxLag)
# A big old 20 second timeout.
gameServerTimeoutMs = base.config.GetInt("game-server-timeout-ms",
20000)
# Try each of the servers in turn.
for url in serverList:
self.notify.info("Connecting to %s via NSPR interface." % (url.cStr()))
self.tcpConn = self.qcm.openTCPClientConnection(
url.getServer(), url.getPort(),
gameServerTimeoutMs)
if self.tcpConn:
self.tcpConn.setNoDelay(1)
self.qcr.addConnection(self.tcpConn)
self.startReaderPollTask()
if successCallback:
successCallback(*successArgs)
return
# Failed to connect.
if failureCallback:
failureCallback(hasProxy, 0, *failureArgs)
def disconnect(self):
"""Closes the previously-established connection.
"""
self.notify.info("Closing connection to server.")
if self.tcpConn != None:
if self.connectHttp:
self.tcpConn.close()
else:
self.qcm.closeConnection(self.tcpConn)
self.tcpConn = None
self.stopReaderPollTask()
def httpConnectCallback(self, ch, serverList, serverIndex, hasProxy,
successCallback, successArgs,
failureCallback, failureArgs):
if ch.isConnectionReady():
self.tcpConn = ch.getConnection()
self.tcpConn.userManagesMemory = 1
self.startReaderPollTask()
if successCallback:
successCallback(*successArgs)
elif serverIndex < len(serverList):
# No connection yet, but keep trying.
url = serverList[serverIndex]
self.notify.info("Connecting to %s via HTTP interface." % (url.cStr()))
ch.beginConnectTo(DocumentSpec(url))
ch.spawnTask(name = 'connect-to-server',
callback = self.httpConnectCallback,
extraArgs = [ch, serverList, serverIndex + 1,
hasProxy,
successCallback, successArgs,
failureCallback, failureArgs])
else:
# No more servers to try; we have to give up now.
if failureCallback:
failureCallback(hasProxy, ch.getStatusCode(), *failureArgs)
def startReaderPollTask(self):
# Stop any tasks we are running now
self.stopReaderPollTask()
taskMgr.add(self.readerPollUntilEmpty, "readerPollTask",
priority=self.TASK_PRIORITY)
return None
def stopReaderPollTask(self):
taskMgr.remove("readerPollTask")
return None
def readerPollUntilEmpty(self, task):
while self.readerPollOnce():
pass
return Task.cont
def readerPollOnce(self):
# we simulate the network plug being pulled by setting tcpConn
# to None; enforce that condition
if not self.tcpConn:
return 0
# Make sure any recently-sent datagrams are flushed when the
# time expires, if we're in collect-tcp mode.
self.tcpConn.considerFlush()
if self.rsDoReport:
self.reportReaderStatistics()
if self.connectHttp:
datagram = Datagram()
if self.tcpConn.receiveDatagram(datagram):
if self.rsDoReport:
self.rsDatagramCount += 1
self.handleDatagram(datagram)
return 1
# Unable to receive a datagram: did we lose the connection?
if self.tcpConn.isClosed():
self.tcpConn = None
self.stopReaderPollTask()
self.loginFSM.request("noConnection")
return 0
else:
self.ensureValidConnection()
if self.qcr.dataAvailable():
datagram = NetDatagram()
if self.qcr.getData(datagram):
if self.rsDoReport:
self.rsDatagramCount += 1
self.handleDatagram(datagram)
return 1
return 0
def ensureValidConnection(self):
# Was the connection reset?
if self.connectHttp:
pass
else:
if self.qcm.resetConnectionAvailable():
resetConnectionPointer = PointerToConnection()
if self.qcm.getResetConnection(resetConnectionPointer):
resetConn = resetConnectionPointer.p()
self.qcm.closeConnection(resetConn)
# if we've simulated a network plug pull, restore the
# simulated plug
self.restoreNetworkPlug()
if self.tcpConn.this == resetConn.this:
self.tcpConn = None
self.stopReaderPollTask()
self.loginFSM.request("noConnection")
else:
self.notify.warning("Lost unknown connection.")
return None
def handleDatagram(self, datagram):
# This class is meant to be pure virtual, and any classes that
# inherit from it need to make their own handleDatagram method
pass
def reportReaderStatistics(self):
now = globalClock.getRealTime()
if now - self.rsLastUpdate < self.rsUpdateInterval:
return
self.rsLastUpdate = now
self.notify.info("Received %s datagrams" % (self.rsDatagramCount))
if self.rsUpdateObjs:
self.notify.info("Updates: %s" % (self.rsUpdateObjs))
self.rsDatagramCount = 0
self.rsUpdateObjs = {}
def handleGenerateWithRequired(self, di):
# Get the class Id
classId = di.getArg(STUint16);
@ -618,23 +364,6 @@ class ClientRepository(DirectObject.DirectObject):
# Let the cdc finish the job
cdc.sendUpdate(self, do, fieldName, args, sendToId)
def send(self, datagram):
if self.notify.getDebug():
print "ClientRepository sending datagram:"
datagram.dumpHex(ostream)
print "Caller is %s, line %s, func '%s'" % PythonUtil.callerInfo()
if not self.tcpConn:
self.notify.warning("Unable to send message after connection is closed.")
return
if self.connectHttp:
if not self.tcpConn.sendDatagram(datagram):
self.notify.warning("Could not send datagram.")
else:
self.cw.send(datagram, self.tcpConn)
return None
def replaceMethod(self, oldMethod, newFunction):
foundIt = 0
import new
@ -654,22 +383,6 @@ class ClientRepository(DirectObject.DirectObject):
foundIt = 1
return foundIt
# debugging funcs for simulating a network-plug-pull
def pullNetworkPlug(self):
self.restoreNetworkPlug()
self.notify.warning('*** SIMULATING A NETWORK-PLUG-PULL ***')
self.hijackedTcpConn = self.tcpConn
self.tcpConn = None
def networkPlugPulled(self):
return hasattr(self, 'hijackedTcpConn')
def restoreNetworkPlug(self):
if self.networkPlugPulled():
self.notify.info('*** RESTORING SIMULATED PULLED-NETWORK-PLUG ***')
self.tcpConn = self.hijackedTcpConn
del self.hijackedTcpConn
def getAllOfType(self, type):
# Returns a list of all DistributedObjects in the repository
# of a particular type.

View File

@ -0,0 +1,312 @@
from PandaModules import *
import Task
import DirectNotifyGlobal
import DirectObject
class ConnectionRepository(DirectObject.DirectObject):
"""
This is a base class for things that know how to establish a
connection (and exchange datagrams) with a gameserver. This
includes ClientRepository and AIRepository.
"""
notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository")
taskPriority = -30
def __init__(self, config):
DirectObject.DirectObject.__init__(self)
self.config = config
# Set this to 'http' to establish a connection to the server
# using the HTTPClient interface, which ultimately uses the
# OpenSSL socket library (even though SSL is not involved).
# This is not as robust a socket library as NSPR's, but the
# HTTPClient interface does a good job of negotiating the
# connection over an HTTP proxy if one is in use.
# Set it to 'nspr' to use Panda's net interface
# (e.g. QueuedConnectionManager, etc.) to establish the
# connection, which ultimately uses the NSPR socket library.
# This is a much better socket library, but it may be more
# than you need for most applications; and the proxy support
# is weak.
# Set it to 'default' to use the HTTPClient interface if a
# proxy is in place, but the NSPR interface if we don't have a
# proxy.
self.connectMethod = self.config.GetString('connect-method', 'default')
self.connectHttp = None
self.http = None
self.qcm = None
self.cw = None
self.tcpConn = None
# Reader statistics
self.rsDatagramCount = 0
self.rsUpdateObjs = {}
self.rsLastUpdate = 0
self.rsDoReport = self.config.GetBool('reader-statistics', 0)
self.rsUpdateInterval = self.config.GetDouble('reader-statistics-interval', 10)
def connect(self, serverList, allowProxy,
successCallback = None, successArgs = [],
failureCallback = None, failureArgs = []):
"""
Attempts to establish a connection to the server. May return
before the connection is established. The two callbacks
represent the two functions to call (and their arguments) on
success or failure, respectively. The failure callback also
gets one additional parameter, which will be passed in first:
the return status code giving reason for failure, if it is
known.
"""
hasProxy = 0
if allowProxy:
if self.http == None:
self.http = HTTPClient()
proxies = self.http.getProxiesForUrl(serverList[0])
hasProxy = (proxies != '')
if hasProxy:
self.notify.info("Connecting to gameserver via proxy: %s" % (proxies))
else:
self.notify.info("Connecting to gameserver directly (no proxy).");
if self.connectMethod == 'http':
self.connectHttp = 1
elif self.connectMethod == 'nspr':
self.connectHttp = 0
else:
self.connectHttp = (hasProxy or serverList[0].isSsl())
self.bootedIndex = None
self.bootedText = None
if self.connectHttp:
# In the HTTP case, we can't just iterate through the list
# of servers, because each server attempt requires
# spawning a request and then coming back later to check
# the success or failure. Instead, we start the ball
# rolling by calling the connect callback, which will call
# itself repeatedly until we establish a connection (or
# run out of servers).
if self.http == None:
self.http = HTTPClient()
ch = self.http.makeChannel(0)
# Temporary try..except for old Pandas.
try:
ch.setAllowProxy(allowProxy)
except:
pass
self.httpConnectCallback(ch, serverList, 0, hasProxy,
successCallback, successArgs,
failureCallback, failureArgs)
else:
if self.qcm == None:
self.qcm = QueuedConnectionManager()
if self.cw == None:
self.cw = ConnectionWriter(self.qcm, 0)
self.qcr = QueuedConnectionReader(self.qcm, 0)
minLag = self.config.GetFloat('min-lag', 0.)
maxLag = self.config.GetFloat('max-lag', 0.)
if minLag or maxLag:
self.qcr.startDelay(minLag, maxLag)
# A big old 20 second timeout.
gameServerTimeoutMs = self.config.GetInt("game-server-timeout-ms",
20000)
# Try each of the servers in turn.
for url in serverList:
self.notify.info("Connecting to %s via NSPR interface." % (url.cStr()))
self.tcpConn = self.qcm.openTCPClientConnection(
url.getServer(), url.getPort(),
gameServerTimeoutMs)
if self.tcpConn:
self.tcpConn.setNoDelay(1)
self.qcr.addConnection(self.tcpConn)
self.startReaderPollTask()
if successCallback:
successCallback(*successArgs)
return
# Failed to connect.
if failureCallback:
failureCallback(hasProxy, 0, *failureArgs)
def disconnect(self):
"""Closes the previously-established connection.
"""
self.notify.info("Closing connection to server.")
if self.tcpConn != None:
if self.connectHttp:
self.tcpConn.close()
else:
self.qcm.closeConnection(self.tcpConn)
self.tcpConn = None
self.stopReaderPollTask()
def httpConnectCallback(self, ch, serverList, serverIndex, hasProxy,
successCallback, successArgs,
failureCallback, failureArgs):
if ch.isConnectionReady():
self.tcpConn = ch.getConnection()
self.tcpConn.userManagesMemory = 1
self.startReaderPollTask()
if successCallback:
successCallback(*successArgs)
elif serverIndex < len(serverList):
# No connection yet, but keep trying.
url = serverList[serverIndex]
self.notify.info("Connecting to %s via HTTP interface." % (url.cStr()))
ch.beginConnectTo(DocumentSpec(url))
ch.spawnTask(name = 'connect-to-server',
callback = self.httpConnectCallback,
extraArgs = [ch, serverList, serverIndex + 1,
hasProxy,
successCallback, successArgs,
failureCallback, failureArgs])
else:
# No more servers to try; we have to give up now.
if failureCallback:
failureCallback(hasProxy, ch.getStatusCode(), *failureArgs)
def startReaderPollTask(self):
# Stop any tasks we are running now
self.stopReaderPollTask()
taskMgr.add(self.readerPollUntilEmpty, "readerPollTask",
priority = self.taskPriority)
def stopReaderPollTask(self):
taskMgr.remove("readerPollTask")
def readerPollUntilEmpty(self, task):
while self.readerPollOnce():
pass
return Task.cont
def readerPollOnce(self):
# we simulate the network plug being pulled by setting tcpConn
# to None; enforce that condition
if not self.tcpConn:
return 0
# Make sure any recently-sent datagrams are flushed when the
# time expires, if we're in collect-tcp mode.
self.tcpConn.considerFlush()
if self.rsDoReport:
self.reportReaderStatistics()
if self.connectHttp:
datagram = Datagram()
if self.tcpConn.receiveDatagram(datagram):
if self.rsDoReport:
self.rsDatagramCount += 1
self.handleDatagram(datagram)
return 1
# Unable to receive a datagram: did we lose the connection?
if self.tcpConn.isClosed():
self.tcpConn = None
self.stopReaderPollTask()
self.lostConnection()
return 0
else:
self.ensureValidConnection()
if self.qcr.dataAvailable():
datagram = NetDatagram()
if self.qcr.getData(datagram):
if self.rsDoReport:
self.rsDatagramCount += 1
self.handleDatagram(datagram)
return 1
return 0
def ensureValidConnection(self):
# Was the connection reset?
if self.connectHttp:
pass
else:
if self.qcm.resetConnectionAvailable():
resetConnectionPointer = PointerToConnection()
if self.qcm.getResetConnection(resetConnectionPointer):
resetConn = resetConnectionPointer.p()
self.qcm.closeConnection(resetConn)
# if we've simulated a network plug pull, restore the
# simulated plug
self.restoreNetworkPlug()
if self.tcpConn.this == resetConn.this:
self.tcpConn = None
self.stopReaderPollTask()
self.lostConnection()
else:
self.notify.warning("Lost unknown connection.")
def lostConnection(self):
# This should be overrided by a derived class to handle an
# unexpectedly lost connection to the gameserver.
self.notify.warning("Lost connection to gameserver.")
def handleDatagram(self, datagram):
# This class is meant to be pure virtual, and any classes that
# inherit from it need to make their own handleDatagram method
pass
def reportReaderStatistics(self):
now = globalClock.getRealTime()
if now - self.rsLastUpdate < self.rsUpdateInterval:
return
self.rsLastUpdate = now
self.notify.info("Received %s datagrams" % (self.rsDatagramCount))
if self.rsUpdateObjs:
self.notify.info("Updates: %s" % (self.rsUpdateObjs))
self.rsDatagramCount = 0
self.rsUpdateObjs = {}
def send(self, datagram):
if self.notify.getDebug():
print "ConnectionRepository sending datagram:"
datagram.dumpHex(ostream)
if not self.tcpConn:
self.notify.warning("Unable to send message after connection is closed.")
return
if self.connectHttp:
if not self.tcpConn.sendDatagram(datagram):
self.notify.warning("Could not send datagram.")
else:
self.cw.send(datagram, self.tcpConn)
# debugging funcs for simulating a network-plug-pull
def pullNetworkPlug(self):
self.restoreNetworkPlug()
self.notify.warning('*** SIMULATING A NETWORK-PLUG-PULL ***')
self.hijackedTcpConn = self.tcpConn
self.tcpConn = None
def networkPlugPulled(self):
return hasattr(self, 'hijackedTcpConn')
def restoreNetworkPlug(self):
if self.networkPlugPulled():
self.notify.info('*** RESTORING SIMULATED PULLED-NETWORK-PLUG ***')
self.tcpConn = self.hijackedTcpConn
del self.hijackedTcpConn

View File

@ -7,6 +7,9 @@ class DirectObject:
"""
This is the class that all Direct/SAL classes should inherit from
"""
def __init__(self):
pass
#def __del__(self):
# print "Destructing: ", self.__class__.__name__