Implemented threaded filesystem saving.

(cherry picked from commit f129282)
This commit is contained in:
Vexatos 2018-02-16 01:32:12 +01:00 committed by payonel
parent db18c1df62
commit a8a421e25a
3 changed files with 96 additions and 5 deletions

View File

@ -9,6 +9,7 @@ import cpw.mods.fml.common.network.FMLEventChannel
import li.cil.oc.common.IMC import li.cil.oc.common.IMC
import li.cil.oc.common.Proxy import li.cil.oc.common.Proxy
import li.cil.oc.server.command.CommandHandler import li.cil.oc.server.command.CommandHandler
import li.cil.oc.server.fs.BufferedFileSaveHandler
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
@ -54,7 +55,15 @@ object OpenComputers {
def missingMappings(e: FMLMissingMappingsEvent) = proxy.missingMappings(e) def missingMappings(e: FMLMissingMappingsEvent) = proxy.missingMappings(e)
@EventHandler @EventHandler
def serverStart(e: FMLServerStartingEvent) = CommandHandler.register(e) def serverStart(e: FMLServerStartingEvent): Unit = {
CommandHandler.register(e)
BufferedFileSaveHandler.newThreadPool()
}
@EventHandler
def serverStop(e: FMLServerStoppedEvent): Unit = {
BufferedFileSaveHandler.waitForSaving()
}
@EventHandler @EventHandler
def imc(e: IMCEvent) = IMC.handleEvent(e) def imc(e: IMCEvent) = IMC.handleEvent(e)

View File

@ -2,7 +2,12 @@ package li.cil.oc.server.fs
import java.io import java.io
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.util.concurrent.CancellationException
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import li.cil.oc.OpenComputers
import li.cil.oc.api.fs.Mode import li.cil.oc.api.fs.Mode
import net.minecraft.nbt.NBTTagCompound import net.minecraft.nbt.NBTTagCompound
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
@ -34,13 +39,26 @@ trait Buffered extends OutputStreamFileSystem {
// ----------------------------------------------------------------------- // // ----------------------------------------------------------------------- //
override def load(nbt: NBTTagCompound) = { private var saving: Option[Future[_]] = None
override def load(nbt: NBTTagCompound): Unit = {
saving.foreach(f => try {
f.get(120L, TimeUnit.SECONDS)
} catch {
case e: TimeoutException => OpenComputers.log.warn("Waiting for filesystem to save took two minutes! Aborting.")
case e: CancellationException => // NO-OP
})
loadFiles(nbt)
super.load(nbt)
}
private def loadFiles(nbt: NBTTagCompound): Unit = this.synchronized {
def recurse(path: String, directory: io.File) { def recurse(path: String, directory: io.File) {
makeDirectory(path) makeDirectory(path)
for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) {
val childPath = path + child.getName val childPath = path + child.getName
val childFile = new io.File(directory, child.getName) val childFile = new io.File(directory, child.getName)
if (child.exists() && child.isDirectory && child.list() != null) { if (child.exists() && child .isDirectory && child.list() != null) {
recurse(childPath + "/", childFile) recurse(childPath + "/", childFile)
} }
else if (!exists(childPath) || !isDirectory(childPath)) { else if (!exists(childPath) || !isDirectory(childPath)) {
@ -74,13 +92,14 @@ trait Buffered extends OutputStreamFileSystem {
fileRoot.delete() fileRoot.delete()
} }
else recurse("", fileRoot) else recurse("", fileRoot)
super.load(nbt)
} }
override def save(nbt: NBTTagCompound) = { override def save(nbt: NBTTagCompound) = {
super.save(nbt) super.save(nbt)
saving = BufferedFileSaveHandler.scheduleSave(this)
}
def saveFiles(): Unit = this.synchronized {
for ((path, time) <- deletions) { for ((path, time) <- deletions) {
val file = new io.File(fileRoot, path) val file = new io.File(fileRoot, path)
if (FileUtils.isFileOlder(file, time)) if (FileUtils.isFileOlder(file, time))

View File

@ -0,0 +1,63 @@
package li.cil.oc.server.fs
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import li.cil.oc.OpenComputers
import li.cil.oc.util.ThreadPoolFactory
object BufferedFileSaveHandler {
private var _threadPool: ScheduledExecutorService = _
private def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = {
if (_threadPool == null) {
OpenComputers.log.warn("Error handling file saving: Did the server never start?")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
} else if (_threadPool.isShutdown || _threadPool.isTerminated) {
OpenComputers.log.warn("Error handling file saving: Thread pool shut down!")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
}
Option(f(_threadPool))
}
def newThreadPool(): Unit = {
if (_threadPool != null && !_threadPool.isTerminated) {
_threadPool.shutdownNow()
}
_threadPool = ThreadPoolFactory.create("FileSystem", 1)
}
def scheduleSave(fs: Buffered): Option[Future[_]] = withPool(threadPool => threadPool.submit(new Runnable {
override def run(): Unit = fs.saveFiles()
}))
def waitForSaving(): Unit = withPool(threadPool => {
try {
threadPool.shutdown()
var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.warn("Warning: Saving the filesystem has already taken 15 seconds!")
terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.error("Warning: Saving the filesystem has already taken two minutes! Aborting")
threadPool.shutdownNow()
}
}
} catch {
case e: InterruptedException => e.printStackTrace()
}
null
}, requiresPool = false)
}