Offloading state saving into a separate thread.

(cherry picked from commit 0b256e0)
This commit is contained in:
Vexatos 2018-02-16 01:32:12 +01:00 committed by payonel
parent a8a421e25a
commit 4010927a3e
5 changed files with 152 additions and 141 deletions

View File

@ -9,7 +9,7 @@ import cpw.mods.fml.common.network.FMLEventChannel
import li.cil.oc.common.IMC import li.cil.oc.common.IMC
import li.cil.oc.common.Proxy import li.cil.oc.common.Proxy
import li.cil.oc.server.command.CommandHandler import li.cil.oc.server.command.CommandHandler
import li.cil.oc.server.fs.BufferedFileSaveHandler import li.cil.oc.util.ThreadPoolFactory
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
@ -57,12 +57,12 @@ object OpenComputers {
@EventHandler @EventHandler
def serverStart(e: FMLServerStartingEvent): Unit = { def serverStart(e: FMLServerStartingEvent): Unit = {
CommandHandler.register(e) CommandHandler.register(e)
BufferedFileSaveHandler.newThreadPool() ThreadPoolFactory.safePools.foreach(_.newThreadPool())
} }
@EventHandler @EventHandler
def serverStop(e: FMLServerStoppedEvent): Unit = { def serverStop(e: FMLServerStoppedEvent): Unit = {
BufferedFileSaveHandler.waitForSaving() ThreadPoolFactory.safePools.foreach(_.waitForCompletion())
} }
@EventHandler @EventHandler

View File

@ -4,6 +4,11 @@ import java.io
import java.io._ import java.io._
import java.nio.file._ import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.CancellationException
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import cpw.mods.fml.common.eventhandler.EventPriority import cpw.mods.fml.common.eventhandler.EventPriority
import cpw.mods.fml.common.eventhandler.SubscribeEvent import cpw.mods.fml.common.eventhandler.SubscribeEvent
@ -12,12 +17,13 @@ import li.cil.oc.Settings
import li.cil.oc.api.machine.MachineHost import li.cil.oc.api.machine.MachineHost
import li.cil.oc.api.network.EnvironmentHost import li.cil.oc.api.network.EnvironmentHost
import li.cil.oc.util.BlockPosition import li.cil.oc.util.BlockPosition
import li.cil.oc.util.SafeThreadPool
import li.cil.oc.util.ThreadPoolFactory
import net.minecraft.nbt.CompressedStreamTools import net.minecraft.nbt.CompressedStreamTools
import net.minecraft.nbt.NBTTagCompound import net.minecraft.nbt.NBTTagCompound
import net.minecraft.world.ChunkCoordIntPair import net.minecraft.world.ChunkCoordIntPair
import net.minecraft.world.World import net.minecraft.world.World
import net.minecraftforge.common.DimensionManager import net.minecraftforge.common.DimensionManager
import net.minecraftforge.event.world.ChunkDataEvent
import net.minecraftforge.event.world.WorldEvent import net.minecraftforge.event.world.WorldEvent
import org.apache.commons.lang3.JavaVersion import org.apache.commons.lang3.JavaVersion
import org.apache.commons.lang3.SystemUtils import org.apache.commons.lang3.SystemUtils
@ -42,7 +48,32 @@ object SaveHandler {
// which takes a lot of time and is completely unnecessary in those cases. // which takes a lot of time and is completely unnecessary in those cases.
var savingForClients = false var savingForClients = false
val saveData = mutable.Map.empty[Int, mutable.Map[ChunkCoordIntPair, mutable.Map[String, Array[Byte]]]] class SaveDataEntry(val data: Array[Byte], val pos: ChunkCoordIntPair, val name: String, val dimension: Int) extends Runnable {
override def run(): Unit = {
val path = statePath
val dimPath = new io.File(path, dimension.toString)
val chunkPath = new io.File(dimPath, s"${this.pos.chunkXPos}.${this.pos.chunkZPos}")
chunkDirs.add(chunkPath)
if (!chunkPath.exists()) {
chunkPath.mkdirs()
}
val file = new io.File(chunkPath, this.name)
try {
// val fos = new GZIPOutputStream(new io.FileOutputStream(file))
val fos = new io.BufferedOutputStream(new io.FileOutputStream(file))
fos.write(this.data)
fos.close()
}
catch {
case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e)
}
}
}
val stateSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("SaveHandler", 1)
val chunkDirs = new ConcurrentLinkedDeque[io.File]()
val saving = mutable.HashMap.empty[String, Future[_]]
def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath) def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath)
@ -113,37 +144,34 @@ object SaveHandler {
val dimension = nbt.getInteger("dimension") val dimension = nbt.getInteger("dimension")
val chunk = new ChunkCoordIntPair(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ")) val chunk = new ChunkCoordIntPair(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ"))
// Wait for the latest save task for the requested file to complete.
// This prevents the chance of loading an outdated version
// of this file.
saving.get(name).foreach(f => try {
f.get(120L, TimeUnit.SECONDS)
} catch {
case e: TimeoutException => OpenComputers.log.warn("Waiting for state data to save took two minutes! Aborting.")
case e: CancellationException => // NO-OP
})
saving.remove(name)
load(dimension, chunk, name) load(dimension, chunk, name)
} }
def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]) = saveData.synchronized { def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]): Unit = {
if (chunk == null) throw new IllegalArgumentException("chunk is null") if (chunk == null) throw new IllegalArgumentException("chunk is null")
else { else {
// Make sure we get rid of old versions (e.g. left over by other mods // Disregarding whether or not there already was a
// triggering a save - this is mostly used for RiM compatibility). We // save submitted for the requested file
// need to do this for *each* dimension, in case computers are teleported // allows for better concurrency at the cost of
// across dimensions. // doing more writing operations.
for (chunks <- saveData.values) chunks.values.foreach(_ -= name) stateSaveHandler.withPool(_.submit(new SaveDataEntry(data, chunk, name, dimension))).foreach(saving.put(name, _))
val chunks = saveData.getOrElseUpdate(dimension, mutable.Map.empty)
chunks.getOrElseUpdate(chunk, mutable.Map.empty) += name -> data
} }
} }
def load(dimension: Int, chunk: ChunkCoordIntPair, name: String): Array[Byte] = { def load(dimension: Int, chunk: ChunkCoordIntPair, name: String): Array[Byte] = {
if (chunk == null) throw new IllegalArgumentException("chunk is null") if (chunk == null) throw new IllegalArgumentException("chunk is null")
// Use data from 'cache' if possible. This avoids weird things happening
// when writeToNBT+readFromNBT is called by other mods (i.e. this is not
// used to actually save the data to disk).
saveData.get(dimension) match {
case Some(chunks) => chunks.get(chunk) match {
case Some(map) => map.get(name) match {
case Some(data) => return data
case _ =>
}
case _ =>
}
case _ =>
}
val path = statePath val path = statePath
val dimPath = new io.File(path, dimension.toString) val dimPath = new io.File(path, dimension.toString)
val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}")
@ -171,35 +199,28 @@ object SaveHandler {
} }
} }
@SubscribeEvent def cleanSaveData(): Unit = {
def onChunkSave(e: ChunkDataEvent.Save) = saveData.synchronized { while (!chunkDirs.isEmpty) {
val path = statePath val chunkPath = chunkDirs.poll()
val dimension = e.world.provider.dimensionId
val chunk = e.getChunk.getChunkCoordIntPair
val dimPath = new io.File(path, dimension.toString)
val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}")
if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) {
for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete()
} }
saveData.get(dimension) match {
case Some(chunks) => chunks.get(chunk) match {
case Some(entries) =>
chunkPath.mkdirs()
for ((name, data) <- entries) {
val file = new io.File(chunkPath, name)
try {
// val fos = new GZIPOutputStream(new io.FileOutputStream(file))
val fos = new io.BufferedOutputStream(new io.FileOutputStream(file))
fos.write(data)
fos.close()
} }
catch {
case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) // Delete empty folders to keep the state folder clean.
val emptyDirs = savePath.listFiles(new FileFilter {
override def accept(file: File) = file.isDirectory &&
// Make sure we only consider file system folders (UUID).
file.getName.matches(uuidRegex) &&
// We set the modified time in the save() method of unbuffered file
// systems, to avoid deleting in-use folders here.
System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && {
val list = file.list()
list == null || list.isEmpty
} }
} })
case _ => chunkPath.delete() if (emptyDirs != null) {
} emptyDirs.filter(_ != null).foreach(_.delete())
case _ =>
} }
} }
@ -224,28 +245,9 @@ object SaveHandler {
@SubscribeEvent(priority = EventPriority.LOWEST) @SubscribeEvent(priority = EventPriority.LOWEST)
def onWorldSave(e: WorldEvent.Save) { def onWorldSave(e: WorldEvent.Save) {
saveData.synchronized { stateSaveHandler.withPool(_.submit(new Runnable {
saveData.get(e.world.provider.dimensionId) match { override def run(): Unit = cleanSaveData()
case Some(chunks) => chunks.clear() }))
case _ =>
}
}
// Delete empty folders to keep the state folder clean.
val emptyDirs = savePath.listFiles(new FileFilter {
override def accept(file: File) = file.isDirectory &&
// Make sure we only consider file system folders (UUID).
file.getName.matches(uuidRegex) &&
// We set the modified time in the save() method of unbuffered file
// systems, to avoid deleting in-use folders here.
System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && {
val list = file.list()
list == null || list.isEmpty
}
})
if (emptyDirs != null) {
emptyDirs.filter(_ != null).foreach(_.delete())
}
} }
} }

View File

@ -9,11 +9,17 @@ import java.util.concurrent.TimeoutException
import li.cil.oc.OpenComputers import li.cil.oc.OpenComputers
import li.cil.oc.api.fs.Mode import li.cil.oc.api.fs.Mode
import li.cil.oc.util.ThreadPoolFactory
import li.cil.oc.util.SafeThreadPool
import net.minecraft.nbt.NBTTagCompound import net.minecraft.nbt.NBTTagCompound
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import scala.collection.mutable import scala.collection.mutable
object Buffered {
val fileSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("FileSystem", 1)
}
trait Buffered extends OutputStreamFileSystem { trait Buffered extends OutputStreamFileSystem {
protected def fileRoot: io.File protected def fileRoot: io.File
@ -58,7 +64,7 @@ trait Buffered extends OutputStreamFileSystem {
for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) {
val childPath = path + child.getName val childPath = path + child.getName
val childFile = new io.File(directory, child.getName) val childFile = new io.File(directory, child.getName)
if (child.exists() && child .isDirectory && child.list() != null) { if (child.exists() && child.isDirectory && child.list() != null) {
recurse(childPath + "/", childFile) recurse(childPath + "/", childFile)
} }
else if (!exists(childPath) || !isDirectory(childPath)) { else if (!exists(childPath) || !isDirectory(childPath)) {
@ -96,7 +102,9 @@ trait Buffered extends OutputStreamFileSystem {
override def save(nbt: NBTTagCompound) = { override def save(nbt: NBTTagCompound) = {
super.save(nbt) super.save(nbt)
saving = BufferedFileSaveHandler.scheduleSave(this) saving = Buffered.fileSaveHandler.withPool(_.submit(new Runnable {
override def run(): Unit = saveFiles()
}))
} }
def saveFiles(): Unit = this.synchronized { def saveFiles(): Unit = this.synchronized {

View File

@ -1,63 +0,0 @@
package li.cil.oc.server.fs
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import li.cil.oc.OpenComputers
import li.cil.oc.util.ThreadPoolFactory
object BufferedFileSaveHandler {
private var _threadPool: ScheduledExecutorService = _
private def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = {
if (_threadPool == null) {
OpenComputers.log.warn("Error handling file saving: Did the server never start?")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
} else if (_threadPool.isShutdown || _threadPool.isTerminated) {
OpenComputers.log.warn("Error handling file saving: Thread pool shut down!")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
}
Option(f(_threadPool))
}
def newThreadPool(): Unit = {
if (_threadPool != null && !_threadPool.isTerminated) {
_threadPool.shutdownNow()
}
_threadPool = ThreadPoolFactory.create("FileSystem", 1)
}
def scheduleSave(fs: Buffered): Option[Future[_]] = withPool(threadPool => threadPool.submit(new Runnable {
override def run(): Unit = fs.saveFiles()
}))
def waitForSaving(): Unit = withPool(threadPool => {
try {
threadPool.shutdown()
var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.warn("Warning: Saving the filesystem has already taken 15 seconds!")
terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.error("Warning: Saving the filesystem has already taken two minutes! Aborting")
threadPool.shutdownNow()
}
}
} catch {
case e: InterruptedException => e.printStackTrace()
}
null
}, requiresPool = false)
}

View File

@ -1,11 +1,17 @@
package li.cil.oc.util package li.cil.oc.util
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import li.cil.oc.OpenComputers
import li.cil.oc.Settings import li.cil.oc.Settings
import scala.collection.mutable
object ThreadPoolFactory { object ThreadPoolFactory {
val priority = { val priority = {
val custom = Settings.get.threadPriority val custom = Settings.get.threadPriority
@ -35,4 +41,62 @@ object ThreadPoolFactory {
thread thread
} }
}) })
val safePools: mutable.ArrayBuffer[SafeThreadPool] = mutable.ArrayBuffer.empty[SafeThreadPool]
def createSafePool(name: String, threads: Int): SafeThreadPool = {
val handler = new SafeThreadPool(name, threads)
safePools += handler
handler
}
}
class SafeThreadPool(val name: String, val threads: Int) {
private var _threadPool: ScheduledExecutorService = _
def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = {
if (_threadPool == null) {
OpenComputers.log.warn("Error handling file saving: Did the server never start?")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
} else if (_threadPool.isShutdown || _threadPool.isTerminated) {
OpenComputers.log.warn("Error handling file saving: Thread pool shut down!")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
}
Option(f(_threadPool))
}
def newThreadPool(): Unit = {
if (_threadPool != null && !_threadPool.isTerminated) {
_threadPool.shutdownNow()
}
_threadPool = ThreadPoolFactory.create(name, threads)
}
def waitForCompletion(): Unit = withPool(threadPool => {
try {
threadPool.shutdown()
var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.warn("Warning: Completing all tasks has already taken 15 seconds!")
terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.error("Warning: Completing all tasks has already taken two minutes! Aborting")
threadPool.shutdownNow()
}
}
} catch {
case e: InterruptedException => e.printStackTrace()
}
null
}, requiresPool = false)
} }