diff --git a/src/main/scala/li/cil/oc/OpenComputers.scala b/src/main/scala/li/cil/oc/OpenComputers.scala index 7584aa098..d6e431185 100644 --- a/src/main/scala/li/cil/oc/OpenComputers.scala +++ b/src/main/scala/li/cil/oc/OpenComputers.scala @@ -9,7 +9,7 @@ import cpw.mods.fml.common.network.FMLEventChannel import li.cil.oc.common.IMC import li.cil.oc.common.Proxy import li.cil.oc.server.command.CommandHandler -import li.cil.oc.server.fs.BufferedFileSaveHandler +import li.cil.oc.util.ThreadPoolFactory import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger @@ -57,12 +57,12 @@ object OpenComputers { @EventHandler def serverStart(e: FMLServerStartingEvent): Unit = { CommandHandler.register(e) - BufferedFileSaveHandler.newThreadPool() + ThreadPoolFactory.safePools.foreach(_.newThreadPool()) } @EventHandler def serverStop(e: FMLServerStoppedEvent): Unit = { - BufferedFileSaveHandler.waitForSaving() + ThreadPoolFactory.safePools.foreach(_.waitForCompletion()) } @EventHandler diff --git a/src/main/scala/li/cil/oc/common/SaveHandler.scala b/src/main/scala/li/cil/oc/common/SaveHandler.scala index 33211e646..8f785e769 100644 --- a/src/main/scala/li/cil/oc/common/SaveHandler.scala +++ b/src/main/scala/li/cil/oc/common/SaveHandler.scala @@ -4,6 +4,11 @@ import java.io import java.io._ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes +import java.util.concurrent.CancellationException +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import cpw.mods.fml.common.eventhandler.EventPriority import cpw.mods.fml.common.eventhandler.SubscribeEvent @@ -12,12 +17,13 @@ import li.cil.oc.Settings import li.cil.oc.api.machine.MachineHost import li.cil.oc.api.network.EnvironmentHost import li.cil.oc.util.BlockPosition +import li.cil.oc.util.SafeThreadPool +import li.cil.oc.util.ThreadPoolFactory import net.minecraft.nbt.CompressedStreamTools import net.minecraft.nbt.NBTTagCompound import net.minecraft.world.ChunkCoordIntPair import net.minecraft.world.World import net.minecraftforge.common.DimensionManager -import net.minecraftforge.event.world.ChunkDataEvent import net.minecraftforge.event.world.WorldEvent import org.apache.commons.lang3.JavaVersion import org.apache.commons.lang3.SystemUtils @@ -42,7 +48,32 @@ object SaveHandler { // which takes a lot of time and is completely unnecessary in those cases. var savingForClients = false - val saveData = mutable.Map.empty[Int, mutable.Map[ChunkCoordIntPair, mutable.Map[String, Array[Byte]]]] + class SaveDataEntry(val data: Array[Byte], val pos: ChunkCoordIntPair, val name: String, val dimension: Int) extends Runnable { + override def run(): Unit = { + val path = statePath + val dimPath = new io.File(path, dimension.toString) + val chunkPath = new io.File(dimPath, s"${this.pos.chunkXPos}.${this.pos.chunkZPos}") + chunkDirs.add(chunkPath) + if (!chunkPath.exists()) { + chunkPath.mkdirs() + } + val file = new io.File(chunkPath, this.name) + try { + // val fos = new GZIPOutputStream(new io.FileOutputStream(file)) + val fos = new io.BufferedOutputStream(new io.FileOutputStream(file)) + fos.write(this.data) + fos.close() + } + catch { + case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) + } + } + } + + val stateSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("SaveHandler", 1) + + val chunkDirs = new ConcurrentLinkedDeque[io.File]() + val saving = mutable.HashMap.empty[String, Future[_]] def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath) @@ -113,37 +144,34 @@ object SaveHandler { val dimension = nbt.getInteger("dimension") val chunk = new ChunkCoordIntPair(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ")) + // Wait for the latest save task for the requested file to complete. + // This prevents the chance of loading an outdated version + // of this file. + saving.get(name).foreach(f => try { + f.get(120L, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => OpenComputers.log.warn("Waiting for state data to save took two minutes! Aborting.") + case e: CancellationException => // NO-OP + }) + saving.remove(name) + load(dimension, chunk, name) } - def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]) = saveData.synchronized { + def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]): Unit = { if (chunk == null) throw new IllegalArgumentException("chunk is null") else { - // Make sure we get rid of old versions (e.g. left over by other mods - // triggering a save - this is mostly used for RiM compatibility). We - // need to do this for *each* dimension, in case computers are teleported - // across dimensions. - for (chunks <- saveData.values) chunks.values.foreach(_ -= name) - val chunks = saveData.getOrElseUpdate(dimension, mutable.Map.empty) - chunks.getOrElseUpdate(chunk, mutable.Map.empty) += name -> data + // Disregarding whether or not there already was a + // save submitted for the requested file + // allows for better concurrency at the cost of + // doing more writing operations. + stateSaveHandler.withPool(_.submit(new SaveDataEntry(data, chunk, name, dimension))).foreach(saving.put(name, _)) } } def load(dimension: Int, chunk: ChunkCoordIntPair, name: String): Array[Byte] = { if (chunk == null) throw new IllegalArgumentException("chunk is null") - // Use data from 'cache' if possible. This avoids weird things happening - // when writeToNBT+readFromNBT is called by other mods (i.e. this is not - // used to actually save the data to disk). - saveData.get(dimension) match { - case Some(chunks) => chunks.get(chunk) match { - case Some(map) => map.get(name) match { - case Some(data) => return data - case _ => - } - case _ => - } - case _ => - } + val path = statePath val dimPath = new io.File(path, dimension.toString) val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") @@ -171,35 +199,28 @@ object SaveHandler { } } - @SubscribeEvent - def onChunkSave(e: ChunkDataEvent.Save) = saveData.synchronized { - val path = statePath - val dimension = e.world.provider.dimensionId - val chunk = e.getChunk.getChunkCoordIntPair - val dimPath = new io.File(path, dimension.toString) - val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") - if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { - for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() - } - saveData.get(dimension) match { - case Some(chunks) => chunks.get(chunk) match { - case Some(entries) => - chunkPath.mkdirs() - for ((name, data) <- entries) { - val file = new io.File(chunkPath, name) - try { - // val fos = new GZIPOutputStream(new io.FileOutputStream(file)) - val fos = new io.BufferedOutputStream(new io.FileOutputStream(file)) - fos.write(data) - fos.close() - } - catch { - case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) - } - } - case _ => chunkPath.delete() + def cleanSaveData(): Unit = { + while (!chunkDirs.isEmpty) { + val chunkPath = chunkDirs.poll() + if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { + for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() } - case _ => + } + + // Delete empty folders to keep the state folder clean. + val emptyDirs = savePath.listFiles(new FileFilter { + override def accept(file: File) = file.isDirectory && + // Make sure we only consider file system folders (UUID). + file.getName.matches(uuidRegex) && + // We set the modified time in the save() method of unbuffered file + // systems, to avoid deleting in-use folders here. + System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && { + val list = file.list() + list == null || list.isEmpty + } + }) + if (emptyDirs != null) { + emptyDirs.filter(_ != null).foreach(_.delete()) } } @@ -224,28 +245,9 @@ object SaveHandler { @SubscribeEvent(priority = EventPriority.LOWEST) def onWorldSave(e: WorldEvent.Save) { - saveData.synchronized { - saveData.get(e.world.provider.dimensionId) match { - case Some(chunks) => chunks.clear() - case _ => - } - } - - // Delete empty folders to keep the state folder clean. - val emptyDirs = savePath.listFiles(new FileFilter { - override def accept(file: File) = file.isDirectory && - // Make sure we only consider file system folders (UUID). - file.getName.matches(uuidRegex) && - // We set the modified time in the save() method of unbuffered file - // systems, to avoid deleting in-use folders here. - System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && { - val list = file.list() - list == null || list.isEmpty - } - }) - if (emptyDirs != null) { - emptyDirs.filter(_ != null).foreach(_.delete()) - } + stateSaveHandler.withPool(_.submit(new Runnable { + override def run(): Unit = cleanSaveData() + })) } } diff --git a/src/main/scala/li/cil/oc/server/fs/Buffered.scala b/src/main/scala/li/cil/oc/server/fs/Buffered.scala index 3373c7257..b7040227a 100644 --- a/src/main/scala/li/cil/oc/server/fs/Buffered.scala +++ b/src/main/scala/li/cil/oc/server/fs/Buffered.scala @@ -9,11 +9,17 @@ import java.util.concurrent.TimeoutException import li.cil.oc.OpenComputers import li.cil.oc.api.fs.Mode +import li.cil.oc.util.ThreadPoolFactory +import li.cil.oc.util.SafeThreadPool import net.minecraft.nbt.NBTTagCompound import org.apache.commons.io.FileUtils import scala.collection.mutable +object Buffered { + val fileSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("FileSystem", 1) +} + trait Buffered extends OutputStreamFileSystem { protected def fileRoot: io.File @@ -58,7 +64,7 @@ trait Buffered extends OutputStreamFileSystem { for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { val childPath = path + child.getName val childFile = new io.File(directory, child.getName) - if (child.exists() && child .isDirectory && child.list() != null) { + if (child.exists() && child.isDirectory && child.list() != null) { recurse(childPath + "/", childFile) } else if (!exists(childPath) || !isDirectory(childPath)) { @@ -96,7 +102,9 @@ trait Buffered extends OutputStreamFileSystem { override def save(nbt: NBTTagCompound) = { super.save(nbt) - saving = BufferedFileSaveHandler.scheduleSave(this) + saving = Buffered.fileSaveHandler.withPool(_.submit(new Runnable { + override def run(): Unit = saveFiles() + })) } def saveFiles(): Unit = this.synchronized { diff --git a/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala b/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala deleted file mode 100644 index da3d5aeb4..000000000 --- a/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala +++ /dev/null @@ -1,63 +0,0 @@ -package li.cil.oc.server.fs - -import java.util.concurrent.Future -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.TimeUnit - -import li.cil.oc.OpenComputers -import li.cil.oc.util.ThreadPoolFactory - -object BufferedFileSaveHandler { - - private var _threadPool: ScheduledExecutorService = _ - - private def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = { - if (_threadPool == null) { - OpenComputers.log.warn("Error handling file saving: Did the server never start?") - if (requiresPool) { - OpenComputers.log.warn("Creating new thread pool.") - newThreadPool() - } else { - return None - } - } else if (_threadPool.isShutdown || _threadPool.isTerminated) { - OpenComputers.log.warn("Error handling file saving: Thread pool shut down!") - if (requiresPool) { - OpenComputers.log.warn("Creating new thread pool.") - newThreadPool() - } else { - return None - } - } - Option(f(_threadPool)) - } - - def newThreadPool(): Unit = { - if (_threadPool != null && !_threadPool.isTerminated) { - _threadPool.shutdownNow() - } - _threadPool = ThreadPoolFactory.create("FileSystem", 1) - } - - def scheduleSave(fs: Buffered): Option[Future[_]] = withPool(threadPool => threadPool.submit(new Runnable { - override def run(): Unit = fs.saveFiles() - })) - - def waitForSaving(): Unit = withPool(threadPool => { - try { - threadPool.shutdown() - var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS) - if (!terminated) { - OpenComputers.log.warn("Warning: Saving the filesystem has already taken 15 seconds!") - terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS) - if (!terminated) { - OpenComputers.log.error("Warning: Saving the filesystem has already taken two minutes! Aborting") - threadPool.shutdownNow() - } - } - } catch { - case e: InterruptedException => e.printStackTrace() - } - null - }, requiresPool = false) -} diff --git a/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala b/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala index 01675fdd5..9c22b02b1 100644 --- a/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala +++ b/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala @@ -1,11 +1,17 @@ package li.cil.oc.util import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import li.cil.oc.OpenComputers import li.cil.oc.Settings +import scala.collection.mutable + object ThreadPoolFactory { val priority = { val custom = Settings.get.threadPriority @@ -35,4 +41,62 @@ object ThreadPoolFactory { thread } }) + + val safePools: mutable.ArrayBuffer[SafeThreadPool] = mutable.ArrayBuffer.empty[SafeThreadPool] + + def createSafePool(name: String, threads: Int): SafeThreadPool = { + val handler = new SafeThreadPool(name, threads) + safePools += handler + handler + } +} + +class SafeThreadPool(val name: String, val threads: Int) { + private var _threadPool: ScheduledExecutorService = _ + + def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = { + if (_threadPool == null) { + OpenComputers.log.warn("Error handling file saving: Did the server never start?") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } else if (_threadPool.isShutdown || _threadPool.isTerminated) { + OpenComputers.log.warn("Error handling file saving: Thread pool shut down!") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } + Option(f(_threadPool)) + } + + def newThreadPool(): Unit = { + if (_threadPool != null && !_threadPool.isTerminated) { + _threadPool.shutdownNow() + } + _threadPool = ThreadPoolFactory.create(name, threads) + } + + def waitForCompletion(): Unit = withPool(threadPool => { + try { + threadPool.shutdown() + var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.warn("Warning: Completing all tasks has already taken 15 seconds!") + terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.error("Warning: Completing all tasks has already taken two minutes! Aborting") + threadPool.shutdownNow() + } + } + } catch { + case e: InterruptedException => e.printStackTrace() + } + null + }, requiresPool = false) }