diff --git a/src/main/scala/li/cil/oc/OpenComputers.scala b/src/main/scala/li/cil/oc/OpenComputers.scala index 3b3c79b5e..102356217 100644 --- a/src/main/scala/li/cil/oc/OpenComputers.scala +++ b/src/main/scala/li/cil/oc/OpenComputers.scala @@ -9,6 +9,7 @@ import net.minecraftforge.fml.common.SidedProxy import net.minecraftforge.fml.common.event.FMLInterModComms.IMCEvent import net.minecraftforge.fml.common.event._ import net.minecraftforge.fml.common.network.FMLEventChannel +import li.cil.oc.util.ThreadPoolFactory import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger @@ -54,7 +55,15 @@ object OpenComputers { def missingMappings(e: FMLMissingMappingsEvent) = proxy.missingMappings(e) @EventHandler - def serverStart(e: FMLServerStartingEvent) = CommandHandler.register(e) + def serverStart(e: FMLServerStartingEvent): Unit = { + CommandHandler.register(e) + ThreadPoolFactory.safePools.foreach(_.newThreadPool()) + } + + @EventHandler + def serverStop(e: FMLServerStoppedEvent): Unit = { + ThreadPoolFactory.safePools.foreach(_.waitForCompletion()) + } @EventHandler def imc(e: IMCEvent) = IMC.handleEvent(e) diff --git a/src/main/scala/li/cil/oc/common/SaveHandler.scala b/src/main/scala/li/cil/oc/common/SaveHandler.scala index f9cf0c98a..b5293086c 100644 --- a/src/main/scala/li/cil/oc/common/SaveHandler.scala +++ b/src/main/scala/li/cil/oc/common/SaveHandler.scala @@ -4,18 +4,24 @@ import java.io import java.io._ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes +import java.util.concurrent.CancellationException +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import li.cil.oc.OpenComputers import li.cil.oc.Settings import li.cil.oc.api.machine.MachineHost import li.cil.oc.api.network.EnvironmentHost import li.cil.oc.util.BlockPosition +import li.cil.oc.util.SafeThreadPool +import li.cil.oc.util.ThreadPoolFactory import net.minecraft.nbt.CompressedStreamTools import net.minecraft.nbt.NBTTagCompound import net.minecraft.util.math.ChunkPos import net.minecraft.world.World import net.minecraftforge.common.DimensionManager -import net.minecraftforge.event.world.ChunkDataEvent import net.minecraftforge.event.world.WorldEvent import net.minecraftforge.fml.common.eventhandler.EventPriority import net.minecraftforge.fml.common.eventhandler.SubscribeEvent @@ -42,7 +48,32 @@ object SaveHandler { // which takes a lot of time and is completely unnecessary in those cases. var savingForClients = false - val saveData = mutable.Map.empty[Int, mutable.Map[ChunkPos, mutable.Map[String, Array[Byte]]]] + class SaveDataEntry(val data: Array[Byte], val pos: ChunkPos, val name: String, val dimension: Int) extends Runnable { + override def run(): Unit = { + val path = statePath + val dimPath = new io.File(path, dimension.toString) + val chunkPath = new io.File(dimPath, s"${this.pos.chunkXPos}.${this.pos.chunkZPos}") + chunkDirs.add(chunkPath) + if (!chunkPath.exists()) { + chunkPath.mkdirs() + } + val file = new io.File(chunkPath, this.name) + try { + // val fos = new GZIPOutputStream(new io.FileOutputStream(file)) + val fos = new io.BufferedOutputStream(new io.FileOutputStream(file)) + fos.write(this.data) + fos.close() + } + catch { + case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) + } + } + } + + val stateSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("SaveHandler", 1) + + val chunkDirs = new ConcurrentLinkedDeque[io.File]() + val saving = mutable.HashMap.empty[String, Future[_]] def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath) @@ -113,37 +144,34 @@ object SaveHandler { val dimension = nbt.getInteger("dimension") val chunk = new ChunkPos(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ")) + // Wait for the latest save task for the requested file to complete. + // This prevents the chance of loading an outdated version + // of this file. + saving.get(name).foreach(f => try { + f.get(120L, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => OpenComputers.log.warn("Waiting for state data to save took two minutes! Aborting.") + case e: CancellationException => // NO-OP + }) + saving.remove(name) + load(dimension, chunk, name) } - def scheduleSave(dimension: Int, chunk: ChunkPos, name: String, data: Array[Byte]) = saveData.synchronized { + def scheduleSave(dimension: Int, chunk: ChunkPos, name: String, data: Array[Byte]): Unit = { if (chunk == null) throw new IllegalArgumentException("chunk is null") else { - // Make sure we get rid of old versions (e.g. left over by other mods - // triggering a save - this is mostly used for RiM compatibility). We - // need to do this for *each* dimension, in case computers are teleported - // across dimensions. - for (chunks <- saveData.values) chunks.values.foreach(_ -= name) - val chunks = saveData.getOrElseUpdate(dimension, mutable.Map.empty) - chunks.getOrElseUpdate(chunk, mutable.Map.empty) += name -> data + // Disregarding whether or not there already was a + // save submitted for the requested file + // allows for better concurrency at the cost of + // doing more writing operations. + stateSaveHandler.withPool(_.submit(new SaveDataEntry(data, chunk, name, dimension))).foreach(saving.put(name, _)) } } def load(dimension: Int, chunk: ChunkPos, name: String): Array[Byte] = { if (chunk == null) throw new IllegalArgumentException("chunk is null") - // Use data from 'cache' if possible. This avoids weird things happening - // when writeToNBT+readFromNBT is called by other mods (i.e. this is not - // used to actually save the data to disk). - saveData.get(dimension) match { - case Some(chunks) => chunks.get(chunk) match { - case Some(map) => map.get(name) match { - case Some(data) => return data - case _ => - } - case _ => - } - case _ => - } + val path = statePath val dimPath = new io.File(path, dimension.toString) val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") @@ -171,35 +199,28 @@ object SaveHandler { } } - @SubscribeEvent - def onChunkSave(e: ChunkDataEvent.Save) = saveData.synchronized { - val path = statePath - val dimension = e.getWorld.provider.getDimension - val chunk = e.getChunk.getChunkCoordIntPair - val dimPath = new io.File(path, dimension.toString) - val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") - if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { - for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() - } - saveData.get(dimension) match { - case Some(chunks) => chunks.get(chunk) match { - case Some(entries) => - chunkPath.mkdirs() - for ((name, data) <- entries) { - val file = new io.File(chunkPath, name) - try { - // val fos = new GZIPOutputStream(new io.FileOutputStream(file)) - val fos = new io.BufferedOutputStream(new io.FileOutputStream(file)) - fos.write(data) - fos.close() - } - catch { - case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) - } - } - case _ => chunkPath.delete() + def cleanSaveData(): Unit = { + while (!chunkDirs.isEmpty) { + val chunkPath = chunkDirs.poll() + if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { + for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() } - case _ => + } + + // Delete empty folders to keep the state folder clean. + val emptyDirs = savePath.listFiles(new FileFilter { + override def accept(file: File) = file.isDirectory && + // Make sure we only consider file system folders (UUID). + file.getName.matches(uuidRegex) && + // We set the modified time in the save() method of unbuffered file + // systems, to avoid deleting in-use folders here. + System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && { + val list = file.list() + list == null || list.isEmpty + } + }) + if (emptyDirs != null) { + emptyDirs.filter(_ != null).foreach(_.delete()) } } @@ -226,28 +247,9 @@ object SaveHandler { @SubscribeEvent(priority = EventPriority.LOWEST) def onWorldSave(e: WorldEvent.Save) { - saveData.synchronized { - saveData.get(e.getWorld.provider.getDimension) match { - case Some(chunks) => chunks.clear() - case _ => - } - } - - // Delete empty folders to keep the state folder clean. - val emptyDirs = savePath.listFiles(new FileFilter { - override def accept(file: File) = file.isDirectory && - // Make sure we only consider file system folders (UUID). - file.getName.matches(uuidRegex) && - // We set the modified time in the save() method of unbuffered file - // systems, to avoid deleting in-use folders here. - System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && { - val list = file.list() - list == null || list.isEmpty - } - }) - if (emptyDirs != null) { - emptyDirs.filter(_ != null).foreach(_.delete()) - } + stateSaveHandler.withPool(_.submit(new Runnable { + override def run(): Unit = cleanSaveData() + })) } } diff --git a/src/main/scala/li/cil/oc/server/fs/Buffered.scala b/src/main/scala/li/cil/oc/server/fs/Buffered.scala index 0c1ec256a..b7040227a 100644 --- a/src/main/scala/li/cil/oc/server/fs/Buffered.scala +++ b/src/main/scala/li/cil/oc/server/fs/Buffered.scala @@ -2,13 +2,24 @@ package li.cil.oc.server.fs import java.io import java.io.FileNotFoundException +import java.util.concurrent.CancellationException +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import li.cil.oc.OpenComputers import li.cil.oc.api.fs.Mode +import li.cil.oc.util.ThreadPoolFactory +import li.cil.oc.util.SafeThreadPool import net.minecraft.nbt.NBTTagCompound import org.apache.commons.io.FileUtils import scala.collection.mutable +object Buffered { + val fileSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("FileSystem", 1) +} + trait Buffered extends OutputStreamFileSystem { protected def fileRoot: io.File @@ -34,7 +45,20 @@ trait Buffered extends OutputStreamFileSystem { // ----------------------------------------------------------------------- // - override def load(nbt: NBTTagCompound) = { + private var saving: Option[Future[_]] = None + + override def load(nbt: NBTTagCompound): Unit = { + saving.foreach(f => try { + f.get(120L, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => OpenComputers.log.warn("Waiting for filesystem to save took two minutes! Aborting.") + case e: CancellationException => // NO-OP + }) + loadFiles(nbt) + super.load(nbt) + } + + private def loadFiles(nbt: NBTTagCompound): Unit = this.synchronized { def recurse(path: String, directory: io.File) { makeDirectory(path) for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { @@ -74,13 +98,16 @@ trait Buffered extends OutputStreamFileSystem { fileRoot.delete() } else recurse("", fileRoot) - - super.load(nbt) } override def save(nbt: NBTTagCompound) = { super.save(nbt) + saving = Buffered.fileSaveHandler.withPool(_.submit(new Runnable { + override def run(): Unit = saveFiles() + })) + } + def saveFiles(): Unit = this.synchronized { for ((path, time) <- deletions) { val file = new io.File(fileRoot, path) if (FileUtils.isFileOlder(file, time)) @@ -88,13 +115,14 @@ trait Buffered extends OutputStreamFileSystem { } deletions.clear() - def recurse(path: String) { + def recurse(path: String):Boolean = { val directory = new io.File(fileRoot, path) directory.mkdirs() + var dirChanged = false for (child <- list(path)) { val childPath = path + child if (isDirectory(childPath)) - recurse(childPath) + dirChanged = recurse(childPath) || dirChanged else { val childFile = new io.File(fileRoot, childPath) val time = lastModified(childPath) @@ -107,10 +135,14 @@ trait Buffered extends OutputStreamFileSystem { out.close() in.close() childFile.setLastModified(time) + dirChanged = true } } } - directory.setLastModified(lastModified(path)) + if (dirChanged) { + directory.setLastModified(lastModified(path)) + true + } else false } if (list("") == null || list("").isEmpty) { fileRoot.delete() diff --git a/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala b/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala index 01675fdd5..9c22b02b1 100644 --- a/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala +++ b/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala @@ -1,11 +1,17 @@ package li.cil.oc.util import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import li.cil.oc.OpenComputers import li.cil.oc.Settings +import scala.collection.mutable + object ThreadPoolFactory { val priority = { val custom = Settings.get.threadPriority @@ -35,4 +41,62 @@ object ThreadPoolFactory { thread } }) + + val safePools: mutable.ArrayBuffer[SafeThreadPool] = mutable.ArrayBuffer.empty[SafeThreadPool] + + def createSafePool(name: String, threads: Int): SafeThreadPool = { + val handler = new SafeThreadPool(name, threads) + safePools += handler + handler + } +} + +class SafeThreadPool(val name: String, val threads: Int) { + private var _threadPool: ScheduledExecutorService = _ + + def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = { + if (_threadPool == null) { + OpenComputers.log.warn("Error handling file saving: Did the server never start?") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } else if (_threadPool.isShutdown || _threadPool.isTerminated) { + OpenComputers.log.warn("Error handling file saving: Thread pool shut down!") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } + Option(f(_threadPool)) + } + + def newThreadPool(): Unit = { + if (_threadPool != null && !_threadPool.isTerminated) { + _threadPool.shutdownNow() + } + _threadPool = ThreadPoolFactory.create(name, threads) + } + + def waitForCompletion(): Unit = withPool(threadPool => { + try { + threadPool.shutdown() + var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.warn("Warning: Completing all tasks has already taken 15 seconds!") + terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.error("Warning: Completing all tasks has already taken two minutes! Aborting") + threadPool.shutdownNow() + } + } + } catch { + case e: InterruptedException => e.printStackTrace() + } + null + }, requiresPool = false) }