Merge branch 'master-MC1.7.10' into master-MC1.10

# Conflicts:
#	src/main/scala/li/cil/oc/OpenComputers.scala
#	src/main/scala/li/cil/oc/common/SaveHandler.scala
This commit is contained in:
payonel 2018-11-09 17:21:48 -08:00
commit 4475e6212a
4 changed files with 187 additions and 80 deletions

View File

@ -9,6 +9,7 @@ import net.minecraftforge.fml.common.SidedProxy
import net.minecraftforge.fml.common.event.FMLInterModComms.IMCEvent import net.minecraftforge.fml.common.event.FMLInterModComms.IMCEvent
import net.minecraftforge.fml.common.event._ import net.minecraftforge.fml.common.event._
import net.minecraftforge.fml.common.network.FMLEventChannel import net.minecraftforge.fml.common.network.FMLEventChannel
import li.cil.oc.util.ThreadPoolFactory
import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
@ -54,7 +55,15 @@ object OpenComputers {
def missingMappings(e: FMLMissingMappingsEvent) = proxy.missingMappings(e) def missingMappings(e: FMLMissingMappingsEvent) = proxy.missingMappings(e)
@EventHandler @EventHandler
def serverStart(e: FMLServerStartingEvent) = CommandHandler.register(e) def serverStart(e: FMLServerStartingEvent): Unit = {
CommandHandler.register(e)
ThreadPoolFactory.safePools.foreach(_.newThreadPool())
}
@EventHandler
def serverStop(e: FMLServerStoppedEvent): Unit = {
ThreadPoolFactory.safePools.foreach(_.waitForCompletion())
}
@EventHandler @EventHandler
def imc(e: IMCEvent) = IMC.handleEvent(e) def imc(e: IMCEvent) = IMC.handleEvent(e)

View File

@ -4,18 +4,24 @@ import java.io
import java.io._ import java.io._
import java.nio.file._ import java.nio.file._
import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.CancellationException
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import li.cil.oc.OpenComputers import li.cil.oc.OpenComputers
import li.cil.oc.Settings import li.cil.oc.Settings
import li.cil.oc.api.machine.MachineHost import li.cil.oc.api.machine.MachineHost
import li.cil.oc.api.network.EnvironmentHost import li.cil.oc.api.network.EnvironmentHost
import li.cil.oc.util.BlockPosition import li.cil.oc.util.BlockPosition
import li.cil.oc.util.SafeThreadPool
import li.cil.oc.util.ThreadPoolFactory
import net.minecraft.nbt.CompressedStreamTools import net.minecraft.nbt.CompressedStreamTools
import net.minecraft.nbt.NBTTagCompound import net.minecraft.nbt.NBTTagCompound
import net.minecraft.util.math.ChunkPos import net.minecraft.util.math.ChunkPos
import net.minecraft.world.World import net.minecraft.world.World
import net.minecraftforge.common.DimensionManager import net.minecraftforge.common.DimensionManager
import net.minecraftforge.event.world.ChunkDataEvent
import net.minecraftforge.event.world.WorldEvent import net.minecraftforge.event.world.WorldEvent
import net.minecraftforge.fml.common.eventhandler.EventPriority import net.minecraftforge.fml.common.eventhandler.EventPriority
import net.minecraftforge.fml.common.eventhandler.SubscribeEvent import net.minecraftforge.fml.common.eventhandler.SubscribeEvent
@ -42,7 +48,32 @@ object SaveHandler {
// which takes a lot of time and is completely unnecessary in those cases. // which takes a lot of time and is completely unnecessary in those cases.
var savingForClients = false var savingForClients = false
val saveData = mutable.Map.empty[Int, mutable.Map[ChunkPos, mutable.Map[String, Array[Byte]]]] class SaveDataEntry(val data: Array[Byte], val pos: ChunkPos, val name: String, val dimension: Int) extends Runnable {
override def run(): Unit = {
val path = statePath
val dimPath = new io.File(path, dimension.toString)
val chunkPath = new io.File(dimPath, s"${this.pos.chunkXPos}.${this.pos.chunkZPos}")
chunkDirs.add(chunkPath)
if (!chunkPath.exists()) {
chunkPath.mkdirs()
}
val file = new io.File(chunkPath, this.name)
try {
// val fos = new GZIPOutputStream(new io.FileOutputStream(file))
val fos = new io.BufferedOutputStream(new io.FileOutputStream(file))
fos.write(this.data)
fos.close()
}
catch {
case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e)
}
}
}
val stateSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("SaveHandler", 1)
val chunkDirs = new ConcurrentLinkedDeque[io.File]()
val saving = mutable.HashMap.empty[String, Future[_]]
def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath) def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath)
@ -113,37 +144,34 @@ object SaveHandler {
val dimension = nbt.getInteger("dimension") val dimension = nbt.getInteger("dimension")
val chunk = new ChunkPos(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ")) val chunk = new ChunkPos(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ"))
// Wait for the latest save task for the requested file to complete.
// This prevents the chance of loading an outdated version
// of this file.
saving.get(name).foreach(f => try {
f.get(120L, TimeUnit.SECONDS)
} catch {
case e: TimeoutException => OpenComputers.log.warn("Waiting for state data to save took two minutes! Aborting.")
case e: CancellationException => // NO-OP
})
saving.remove(name)
load(dimension, chunk, name) load(dimension, chunk, name)
} }
def scheduleSave(dimension: Int, chunk: ChunkPos, name: String, data: Array[Byte]) = saveData.synchronized { def scheduleSave(dimension: Int, chunk: ChunkPos, name: String, data: Array[Byte]): Unit = {
if (chunk == null) throw new IllegalArgumentException("chunk is null") if (chunk == null) throw new IllegalArgumentException("chunk is null")
else { else {
// Make sure we get rid of old versions (e.g. left over by other mods // Disregarding whether or not there already was a
// triggering a save - this is mostly used for RiM compatibility). We // save submitted for the requested file
// need to do this for *each* dimension, in case computers are teleported // allows for better concurrency at the cost of
// across dimensions. // doing more writing operations.
for (chunks <- saveData.values) chunks.values.foreach(_ -= name) stateSaveHandler.withPool(_.submit(new SaveDataEntry(data, chunk, name, dimension))).foreach(saving.put(name, _))
val chunks = saveData.getOrElseUpdate(dimension, mutable.Map.empty)
chunks.getOrElseUpdate(chunk, mutable.Map.empty) += name -> data
} }
} }
def load(dimension: Int, chunk: ChunkPos, name: String): Array[Byte] = { def load(dimension: Int, chunk: ChunkPos, name: String): Array[Byte] = {
if (chunk == null) throw new IllegalArgumentException("chunk is null") if (chunk == null) throw new IllegalArgumentException("chunk is null")
// Use data from 'cache' if possible. This avoids weird things happening
// when writeToNBT+readFromNBT is called by other mods (i.e. this is not
// used to actually save the data to disk).
saveData.get(dimension) match {
case Some(chunks) => chunks.get(chunk) match {
case Some(map) => map.get(name) match {
case Some(data) => return data
case _ =>
}
case _ =>
}
case _ =>
}
val path = statePath val path = statePath
val dimPath = new io.File(path, dimension.toString) val dimPath = new io.File(path, dimension.toString)
val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}")
@ -171,35 +199,28 @@ object SaveHandler {
} }
} }
@SubscribeEvent def cleanSaveData(): Unit = {
def onChunkSave(e: ChunkDataEvent.Save) = saveData.synchronized { while (!chunkDirs.isEmpty) {
val path = statePath val chunkPath = chunkDirs.poll()
val dimension = e.getWorld.provider.getDimension if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) {
val chunk = e.getChunk.getChunkCoordIntPair for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete()
val dimPath = new io.File(path, dimension.toString)
val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}")
if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) {
for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete()
}
saveData.get(dimension) match {
case Some(chunks) => chunks.get(chunk) match {
case Some(entries) =>
chunkPath.mkdirs()
for ((name, data) <- entries) {
val file = new io.File(chunkPath, name)
try {
// val fos = new GZIPOutputStream(new io.FileOutputStream(file))
val fos = new io.BufferedOutputStream(new io.FileOutputStream(file))
fos.write(data)
fos.close()
}
catch {
case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e)
}
}
case _ => chunkPath.delete()
} }
case _ => }
// Delete empty folders to keep the state folder clean.
val emptyDirs = savePath.listFiles(new FileFilter {
override def accept(file: File) = file.isDirectory &&
// Make sure we only consider file system folders (UUID).
file.getName.matches(uuidRegex) &&
// We set the modified time in the save() method of unbuffered file
// systems, to avoid deleting in-use folders here.
System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && {
val list = file.list()
list == null || list.isEmpty
}
})
if (emptyDirs != null) {
emptyDirs.filter(_ != null).foreach(_.delete())
} }
} }
@ -226,28 +247,9 @@ object SaveHandler {
@SubscribeEvent(priority = EventPriority.LOWEST) @SubscribeEvent(priority = EventPriority.LOWEST)
def onWorldSave(e: WorldEvent.Save) { def onWorldSave(e: WorldEvent.Save) {
saveData.synchronized { stateSaveHandler.withPool(_.submit(new Runnable {
saveData.get(e.getWorld.provider.getDimension) match { override def run(): Unit = cleanSaveData()
case Some(chunks) => chunks.clear() }))
case _ =>
}
}
// Delete empty folders to keep the state folder clean.
val emptyDirs = savePath.listFiles(new FileFilter {
override def accept(file: File) = file.isDirectory &&
// Make sure we only consider file system folders (UUID).
file.getName.matches(uuidRegex) &&
// We set the modified time in the save() method of unbuffered file
// systems, to avoid deleting in-use folders here.
System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && {
val list = file.list()
list == null || list.isEmpty
}
})
if (emptyDirs != null) {
emptyDirs.filter(_ != null).foreach(_.delete())
}
} }
} }

View File

@ -2,13 +2,24 @@ package li.cil.oc.server.fs
import java.io import java.io
import java.io.FileNotFoundException import java.io.FileNotFoundException
import java.util.concurrent.CancellationException
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import li.cil.oc.OpenComputers
import li.cil.oc.api.fs.Mode import li.cil.oc.api.fs.Mode
import li.cil.oc.util.ThreadPoolFactory
import li.cil.oc.util.SafeThreadPool
import net.minecraft.nbt.NBTTagCompound import net.minecraft.nbt.NBTTagCompound
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import scala.collection.mutable import scala.collection.mutable
object Buffered {
val fileSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("FileSystem", 1)
}
trait Buffered extends OutputStreamFileSystem { trait Buffered extends OutputStreamFileSystem {
protected def fileRoot: io.File protected def fileRoot: io.File
@ -34,7 +45,20 @@ trait Buffered extends OutputStreamFileSystem {
// ----------------------------------------------------------------------- // // ----------------------------------------------------------------------- //
override def load(nbt: NBTTagCompound) = { private var saving: Option[Future[_]] = None
override def load(nbt: NBTTagCompound): Unit = {
saving.foreach(f => try {
f.get(120L, TimeUnit.SECONDS)
} catch {
case e: TimeoutException => OpenComputers.log.warn("Waiting for filesystem to save took two minutes! Aborting.")
case e: CancellationException => // NO-OP
})
loadFiles(nbt)
super.load(nbt)
}
private def loadFiles(nbt: NBTTagCompound): Unit = this.synchronized {
def recurse(path: String, directory: io.File) { def recurse(path: String, directory: io.File) {
makeDirectory(path) makeDirectory(path)
for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) {
@ -74,13 +98,16 @@ trait Buffered extends OutputStreamFileSystem {
fileRoot.delete() fileRoot.delete()
} }
else recurse("", fileRoot) else recurse("", fileRoot)
super.load(nbt)
} }
override def save(nbt: NBTTagCompound) = { override def save(nbt: NBTTagCompound) = {
super.save(nbt) super.save(nbt)
saving = Buffered.fileSaveHandler.withPool(_.submit(new Runnable {
override def run(): Unit = saveFiles()
}))
}
def saveFiles(): Unit = this.synchronized {
for ((path, time) <- deletions) { for ((path, time) <- deletions) {
val file = new io.File(fileRoot, path) val file = new io.File(fileRoot, path)
if (FileUtils.isFileOlder(file, time)) if (FileUtils.isFileOlder(file, time))
@ -88,13 +115,14 @@ trait Buffered extends OutputStreamFileSystem {
} }
deletions.clear() deletions.clear()
def recurse(path: String) { def recurse(path: String):Boolean = {
val directory = new io.File(fileRoot, path) val directory = new io.File(fileRoot, path)
directory.mkdirs() directory.mkdirs()
var dirChanged = false
for (child <- list(path)) { for (child <- list(path)) {
val childPath = path + child val childPath = path + child
if (isDirectory(childPath)) if (isDirectory(childPath))
recurse(childPath) dirChanged = recurse(childPath) || dirChanged
else { else {
val childFile = new io.File(fileRoot, childPath) val childFile = new io.File(fileRoot, childPath)
val time = lastModified(childPath) val time = lastModified(childPath)
@ -107,10 +135,14 @@ trait Buffered extends OutputStreamFileSystem {
out.close() out.close()
in.close() in.close()
childFile.setLastModified(time) childFile.setLastModified(time)
dirChanged = true
} }
} }
} }
directory.setLastModified(lastModified(path)) if (dirChanged) {
directory.setLastModified(lastModified(path))
true
} else false
} }
if (list("") == null || list("").isEmpty) { if (list("") == null || list("").isEmpty) {
fileRoot.delete() fileRoot.delete()

View File

@ -1,11 +1,17 @@
package li.cil.oc.util package li.cil.oc.util
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import li.cil.oc.OpenComputers
import li.cil.oc.Settings import li.cil.oc.Settings
import scala.collection.mutable
object ThreadPoolFactory { object ThreadPoolFactory {
val priority = { val priority = {
val custom = Settings.get.threadPriority val custom = Settings.get.threadPriority
@ -35,4 +41,62 @@ object ThreadPoolFactory {
thread thread
} }
}) })
val safePools: mutable.ArrayBuffer[SafeThreadPool] = mutable.ArrayBuffer.empty[SafeThreadPool]
def createSafePool(name: String, threads: Int): SafeThreadPool = {
val handler = new SafeThreadPool(name, threads)
safePools += handler
handler
}
}
class SafeThreadPool(val name: String, val threads: Int) {
private var _threadPool: ScheduledExecutorService = _
def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = {
if (_threadPool == null) {
OpenComputers.log.warn("Error handling file saving: Did the server never start?")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
} else if (_threadPool.isShutdown || _threadPool.isTerminated) {
OpenComputers.log.warn("Error handling file saving: Thread pool shut down!")
if (requiresPool) {
OpenComputers.log.warn("Creating new thread pool.")
newThreadPool()
} else {
return None
}
}
Option(f(_threadPool))
}
def newThreadPool(): Unit = {
if (_threadPool != null && !_threadPool.isTerminated) {
_threadPool.shutdownNow()
}
_threadPool = ThreadPoolFactory.create(name, threads)
}
def waitForCompletion(): Unit = withPool(threadPool => {
try {
threadPool.shutdown()
var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.warn("Warning: Completing all tasks has already taken 15 seconds!")
terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS)
if (!terminated) {
OpenComputers.log.error("Warning: Completing all tasks has already taken two minutes! Aborting")
threadPool.shutdownNow()
}
}
} catch {
case e: InterruptedException => e.printStackTrace()
}
null
}, requiresPool = false)
} }