from PandaModules import * import Task import DirectNotifyGlobal import DirectObject from PyDatagram import PyDatagram import types class ConnectionRepository(DirectObject.DirectObject): """ This is a base class for things that know how to establish a connection (and exchange datagrams) with a gameserver. This includes ClientRepository and AIRepository. """ notify = DirectNotifyGlobal.directNotify.newCategory("ConnectionRepository") taskPriority = -30 def __init__(self, config): DirectObject.DirectObject.__init__(self) self.config = config # Set this to 'http' to establish a connection to the server # using the HTTPClient interface, which ultimately uses the # OpenSSL socket library (even though SSL is not involved). # This is not as robust a socket library as NSPR's, but the # HTTPClient interface does a good job of negotiating the # connection over an HTTP proxy if one is in use. # Set it to 'nspr' to use Panda's net interface # (e.g. QueuedConnectionManager, etc.) to establish the # connection, which ultimately uses the NSPR socket library. # This is a much better socket library, but it may be more # than you need for most applications; and there is no support # for proxies. # Set it to 'default' to use the HTTPClient interface if a # proxy is in place, but the NSPR interface if we don't have a # proxy. self.connectMethod = self.config.GetString('connect-method', 'default') self.connectHttp = None self.http = None self.qcm = None self.cw = None self.tcpConn = None self.recorder = None # Reader statistics self.rsDatagramCount = 0 self.rsUpdateObjs = {} self.rsLastUpdate = 0 self.rsDoReport = self.config.GetBool('reader-statistics', 0) self.rsUpdateInterval = self.config.GetDouble('reader-statistics-interval', 10) def readDCFile(self, dcFileNames = None): """ Reads in the dc files listed in dcFileNames, or if dcFileNames is None, reads in all of the dc files listed in the Configrc file. The resulting DCFile object is stored in self.dcFile. """ self.dcFile = DCFile() self.dclassesByName = {} self.dclassesByNumber = {} dcImports = {} if dcFileNames == None: readResult = self.dcFile.readAll() if not readResult: self.notify.error("Could not read dc file.") else: for dcFileName in dcFileNames: readResult = self.dcFile.read(Filename(dcFileName)) if not readResult: self.notify.error("Could not read dc file: %s" % (dcFileName)) self.hashVal = self.dcFile.getHash() # Now import all of the modules required by the DC file. for n in range(self.dcFile.getNumImportModules()): moduleName = self.dcFile.getImportModule(n) moduleName = self.mangleDCName(moduleName) module = __import__(moduleName, globals(), locals()) if self.dcFile.getNumImportSymbols(n) > 0: # "from moduleName import symbolName, symbolName, ..." # Copy just the named symbols into the dictionary. for i in range(self.dcFile.getNumImportSymbols(n)): symbolName = self.dcFile.getImportSymbol(n, i) if symbolName == '*': # Get all symbols. dcImports.update(module.__dict__) else: mangledName = self.mangleName(symbolName) gotAny = 0 if hasattr(module, symbolName): dcImports[symbolName] = getattr(module, symbolName) gotAny = 1 if hasattr(module, mangledName): dcImports[mangledName] = getattr(module, mangledName) gotAny = 1 if not gotAny: self.notify.error("Symbol %s not found in module %s." % ( symbolName, moduleName)) else: # "import moduleName" # Copy the module itself into the dictionary. dcImports[moduleName] = module # Now get the class definition for the classes named in the DC # file. for i in range(self.dcFile.getNumClasses()): dclass = self.dcFile.getClass(i) number = dclass.getNumber() className = dclass.getName() className = self.mangleDCName(className) # Does the class have a definition defined in the newly # imported namespace? classDef = dcImports.get(className) if classDef == None: self.notify.info("No class definition for %s." % (className)) else: if type(classDef) == types.ModuleType: if not hasattr(classDef, className): self.notify.error("Module %s does not define class %s." % (className, className)) classDef = getattr(classDef, className) if type(classDef) != types.ClassType: self.notify.error("Symbol %s is not a class name." % (className)) else: dclass.setClassDef(classDef) self.dclassesByName[className] = dclass self.dclassesByNumber[number] = dclass def mangleDCName(self, name): """ This is provided as a hook so that derived classes (e.g. the AIRepository) can rename symbols from the .dc file according to the conventions associated with this particular repository (e.g., an AIRepository appends 'AI' to class and module names).""" return name def connect(self, serverList, successCallback = None, successArgs = [], failureCallback = None, failureArgs = []): """ Attempts to establish a connection to the server. May return before the connection is established. The two callbacks represent the two functions to call (and their arguments) on success or failure, respectively. The failure callback also gets one additional parameter, which will be passed in first: the return status code giving reason for failure, if it is known. """ if self.recorder and self.recorder.isPlaying(): # If we have a recorder and it's already in playback mode, # don't actually attempt to connect to a gameserver since # we don't need to. Just let it play back the data. self.notify.info("Not connecting to gameserver; using playback data instead.") self.connectHttp = 1 self.tcpConn = SocketStreamRecorder() self.recorder.addRecorder('gameserver', self.tcpConn) self.startReaderPollTask() if successCallback: successCallback(*successArgs) return hasProxy = 0 if self.checkHttp(): proxies = self.http.getProxiesForUrl(serverList[0]) hasProxy = (proxies != 'DIRECT') if hasProxy: self.notify.info("Connecting to gameserver via proxy list: %s" % (proxies)) else: self.notify.info("Connecting to gameserver directly (no proxy)."); if self.connectMethod == 'http': self.connectHttp = 1 elif self.connectMethod == 'nspr': self.connectHttp = 0 else: self.connectHttp = (hasProxy or serverList[0].isSsl()) self.bootedIndex = None self.bootedText = None if self.connectHttp: # In the HTTP case, we can't just iterate through the list # of servers, because each server attempt requires # spawning a request and then coming back later to check # the success or failure. Instead, we start the ball # rolling by calling the connect callback, which will call # itself repeatedly until we establish a connection (or # run out of servers). ch = self.http.makeChannel(0) self.httpConnectCallback(ch, serverList, 0, successCallback, successArgs, failureCallback, failureArgs) else: if self.qcm == None: self.qcm = QueuedConnectionManager() if self.cw == None: self.cw = ConnectionWriter(self.qcm, 0) self.qcr = QueuedConnectionReader(self.qcm, 0) minLag = self.config.GetFloat('min-lag', 0.) maxLag = self.config.GetFloat('max-lag', 0.) if minLag or maxLag: self.qcr.startDelay(minLag, maxLag) # A big old 20 second timeout. gameServerTimeoutMs = self.config.GetInt("game-server-timeout-ms", 20000) # Try each of the servers in turn. for url in serverList: self.notify.info("Connecting to %s via NSPR interface." % (url.cStr())) self.tcpConn = self.qcm.openTCPClientConnection( url.getServer(), url.getPort(), gameServerTimeoutMs) if self.tcpConn: self.tcpConn.setNoDelay(1) self.qcr.addConnection(self.tcpConn) self.startReaderPollTask() if successCallback: successCallback(*successArgs) return # Failed to connect. if failureCallback: failureCallback(0, '', *failureArgs) def disconnect(self): """Closes the previously-established connection. """ self.notify.info("Closing connection to server.") if self.tcpConn != None: if self.connectHttp: self.tcpConn.close() else: self.qcm.closeConnection(self.tcpConn) self.tcpConn = None self.stopReaderPollTask() def httpConnectCallback(self, ch, serverList, serverIndex, successCallback, successArgs, failureCallback, failureArgs): if ch.isConnectionReady(): self.tcpConn = ch.getConnection() self.tcpConn.userManagesMemory = 1 if self.recorder: # If we have a recorder, we wrap the connect inside a # SocketStreamRecorder, which will trap incoming data # when the recorder is set to record mode. (It will # also play back data when the recorder is in playback # mode, but in that case we never get this far in the # code, since we just create an empty # SocketStreamRecorder without actually connecting to # the gameserver.) stream = SocketStreamRecorder(self.tcpConn, 1) self.recorder.addRecorder('gameserver', stream) # In this case, we pass ownership of the original # connection to the SocketStreamRecorder object. self.tcpConn.userManagesMemory = 0 self.tcpConn = stream self.startReaderPollTask() if successCallback: successCallback(*successArgs) elif serverIndex < len(serverList): # No connection yet, but keep trying. url = serverList[serverIndex] self.notify.info("Connecting to %s via HTTP interface." % (url.cStr())) ch.preserveStatus() ch.beginConnectTo(DocumentSpec(url)) ch.spawnTask(name = 'connect-to-server', callback = self.httpConnectCallback, extraArgs = [ch, serverList, serverIndex + 1, successCallback, successArgs, failureCallback, failureArgs]) else: # No more servers to try; we have to give up now. if failureCallback: failureCallback(ch.getStatusCode(), ch.getStatusString(), *failureArgs) def checkHttp(self): # Creates an HTTPClient, if possible, if we don't have one # already. This might fail if the OpenSSL library isn't # available. Returns the HTTPClient (also self.http), or None # if not set. if self.http == None: try: self.http = HTTPClient() except: pass return self.http def startReaderPollTask(self): # Stop any tasks we are running now self.stopReaderPollTask() taskMgr.add(self.readerPollUntilEmpty, "readerPollTask", priority = self.taskPriority) def stopReaderPollTask(self): taskMgr.remove("readerPollTask") def readerPollUntilEmpty(self, task): while self.readerPollOnce(): pass return Task.cont def readerPollOnce(self): # we simulate the network plug being pulled by setting tcpConn # to None; enforce that condition if not self.tcpConn: return 0 # Make sure any recently-sent datagrams are flushed when the # time expires, if we're in collect-tcp mode. self.tcpConn.considerFlush() if self.rsDoReport: self.reportReaderStatistics() if self.connectHttp: datagram = PyDatagram() if self.tcpConn.receiveDatagram(datagram): if self.rsDoReport: self.rsDatagramCount += 1 self.handleDatagram(datagram) return 1 # Unable to receive a datagram: did we lose the connection? if self.tcpConn.isClosed(): self.tcpConn = None self.stopReaderPollTask() self.lostConnection() return 0 else: self.ensureValidConnection() if self.qcr.dataAvailable(): datagram = NetDatagram() if self.qcr.getData(datagram): if self.rsDoReport: self.rsDatagramCount += 1 self.handleDatagram(datagram) return 1 return 0 def flush(self): # Ensure the latest has been sent to the server. if self.tcpConn: self.tcpConn.flush() def ensureValidConnection(self): # Was the connection reset? if self.connectHttp: pass else: if self.qcm.resetConnectionAvailable(): resetConnectionPointer = PointerToConnection() if self.qcm.getResetConnection(resetConnectionPointer): resetConn = resetConnectionPointer.p() self.qcm.closeConnection(resetConn) # if we've simulated a network plug pull, restore the # simulated plug self.restoreNetworkPlug() if self.tcpConn.this == resetConn.this: self.tcpConn = None self.stopReaderPollTask() self.lostConnection() else: self.notify.warning("Lost unknown connection.") def lostConnection(self): # This should be overrided by a derived class to handle an # unexpectedly lost connection to the gameserver. self.notify.warning("Lost connection to gameserver.") def handleDatagram(self, datagram): # This class is meant to be pure virtual, and any classes that # inherit from it need to make their own handleDatagram method pass def reportReaderStatistics(self): now = globalClock.getRealTime() if now - self.rsLastUpdate < self.rsUpdateInterval: return self.rsLastUpdate = now self.notify.info("Received %s datagrams" % (self.rsDatagramCount)) if self.rsUpdateObjs: self.notify.info("Updates: %s" % (self.rsUpdateObjs)) self.rsDatagramCount = 0 self.rsUpdateObjs = {} def send(self, datagram): #if self.notify.getDebug(): # print "ConnectionRepository sending datagram:" # datagram.dumpHex(ostream) if not self.tcpConn: self.notify.warning("Unable to send message after connection is closed.") return if self.connectHttp: if not self.tcpConn.sendDatagram(datagram): self.notify.warning("Could not send datagram.") else: self.cw.send(datagram, self.tcpConn) # debugging funcs for simulating a network-plug-pull def pullNetworkPlug(self): self.restoreNetworkPlug() self.notify.warning('*** SIMULATING A NETWORK-PLUG-PULL ***') self.hijackedTcpConn = self.tcpConn self.tcpConn = None def networkPlugPulled(self): return hasattr(self, 'hijackedTcpConn') def restoreNetworkPlug(self): if self.networkPlugPulled(): self.notify.info('*** RESTORING SIMULATED PULLED-NETWORK-PLUG ***') self.tcpConn = self.hijackedTcpConn del self.hijackedTcpConn def doFind(self, str): """ returns list of distributed objects with matching str in value """ for value in self.doId2do.values(): if `value`.find(str) >= 0: return value def doFindAll(self, str): """ returns list of distributed objects with matching str in value """ matches = [] for value in self.doId2do.values(): if `value`.find(str) >= 0: matches.append(value) return matches