From db18c1df6214e5a91788140de4d3076949bff45c Mon Sep 17 00:00:00 2001 From: Vexatos Date: Fri, 16 Feb 2018 00:48:56 +0100 Subject: [PATCH 1/3] Only change date of directory modification if it actually changed. (cherry picked from commit c463df0) --- src/main/scala/li/cil/oc/server/fs/Buffered.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/scala/li/cil/oc/server/fs/Buffered.scala b/src/main/scala/li/cil/oc/server/fs/Buffered.scala index 0c1ec256a..5c26bcbbd 100644 --- a/src/main/scala/li/cil/oc/server/fs/Buffered.scala +++ b/src/main/scala/li/cil/oc/server/fs/Buffered.scala @@ -88,13 +88,14 @@ trait Buffered extends OutputStreamFileSystem { } deletions.clear() - def recurse(path: String) { + def recurse(path: String):Boolean = { val directory = new io.File(fileRoot, path) directory.mkdirs() + var dirChanged = false for (child <- list(path)) { val childPath = path + child if (isDirectory(childPath)) - recurse(childPath) + dirChanged = recurse(childPath) || dirChanged else { val childFile = new io.File(fileRoot, childPath) val time = lastModified(childPath) @@ -107,10 +108,14 @@ trait Buffered extends OutputStreamFileSystem { out.close() in.close() childFile.setLastModified(time) + dirChanged = true } } } - directory.setLastModified(lastModified(path)) + if (dirChanged) { + directory.setLastModified(lastModified(path)) + true + } else false } if (list("") == null || list("").isEmpty) { fileRoot.delete() From a8a421e25a3d5efd28e3ce2127e7bc252dd2add9 Mon Sep 17 00:00:00 2001 From: Vexatos Date: Fri, 16 Feb 2018 01:32:12 +0100 Subject: [PATCH 2/3] Implemented threaded filesystem saving. (cherry picked from commit f129282) --- src/main/scala/li/cil/oc/OpenComputers.scala | 11 +++- .../scala/li/cil/oc/server/fs/Buffered.scala | 27 ++++++-- .../server/fs/BufferedFileSaveHandler.scala | 63 +++++++++++++++++++ 3 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala diff --git a/src/main/scala/li/cil/oc/OpenComputers.scala b/src/main/scala/li/cil/oc/OpenComputers.scala index 9f4d99f06..7584aa098 100644 --- a/src/main/scala/li/cil/oc/OpenComputers.scala +++ b/src/main/scala/li/cil/oc/OpenComputers.scala @@ -9,6 +9,7 @@ import cpw.mods.fml.common.network.FMLEventChannel import li.cil.oc.common.IMC import li.cil.oc.common.Proxy import li.cil.oc.server.command.CommandHandler +import li.cil.oc.server.fs.BufferedFileSaveHandler import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger @@ -54,7 +55,15 @@ object OpenComputers { def missingMappings(e: FMLMissingMappingsEvent) = proxy.missingMappings(e) @EventHandler - def serverStart(e: FMLServerStartingEvent) = CommandHandler.register(e) + def serverStart(e: FMLServerStartingEvent): Unit = { + CommandHandler.register(e) + BufferedFileSaveHandler.newThreadPool() + } + + @EventHandler + def serverStop(e: FMLServerStoppedEvent): Unit = { + BufferedFileSaveHandler.waitForSaving() + } @EventHandler def imc(e: IMCEvent) = IMC.handleEvent(e) diff --git a/src/main/scala/li/cil/oc/server/fs/Buffered.scala b/src/main/scala/li/cil/oc/server/fs/Buffered.scala index 5c26bcbbd..3373c7257 100644 --- a/src/main/scala/li/cil/oc/server/fs/Buffered.scala +++ b/src/main/scala/li/cil/oc/server/fs/Buffered.scala @@ -2,7 +2,12 @@ package li.cil.oc.server.fs import java.io import java.io.FileNotFoundException +import java.util.concurrent.CancellationException +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import li.cil.oc.OpenComputers import li.cil.oc.api.fs.Mode import net.minecraft.nbt.NBTTagCompound import org.apache.commons.io.FileUtils @@ -34,13 +39,26 @@ trait Buffered extends OutputStreamFileSystem { // ----------------------------------------------------------------------- // - override def load(nbt: NBTTagCompound) = { + private var saving: Option[Future[_]] = None + + override def load(nbt: NBTTagCompound): Unit = { + saving.foreach(f => try { + f.get(120L, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => OpenComputers.log.warn("Waiting for filesystem to save took two minutes! Aborting.") + case e: CancellationException => // NO-OP + }) + loadFiles(nbt) + super.load(nbt) + } + + private def loadFiles(nbt: NBTTagCompound): Unit = this.synchronized { def recurse(path: String, directory: io.File) { makeDirectory(path) for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { val childPath = path + child.getName val childFile = new io.File(directory, child.getName) - if (child.exists() && child.isDirectory && child.list() != null) { + if (child.exists() && child .isDirectory && child.list() != null) { recurse(childPath + "/", childFile) } else if (!exists(childPath) || !isDirectory(childPath)) { @@ -74,13 +92,14 @@ trait Buffered extends OutputStreamFileSystem { fileRoot.delete() } else recurse("", fileRoot) - - super.load(nbt) } override def save(nbt: NBTTagCompound) = { super.save(nbt) + saving = BufferedFileSaveHandler.scheduleSave(this) + } + def saveFiles(): Unit = this.synchronized { for ((path, time) <- deletions) { val file = new io.File(fileRoot, path) if (FileUtils.isFileOlder(file, time)) diff --git a/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala b/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala new file mode 100644 index 000000000..da3d5aeb4 --- /dev/null +++ b/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala @@ -0,0 +1,63 @@ +package li.cil.oc.server.fs + +import java.util.concurrent.Future +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit + +import li.cil.oc.OpenComputers +import li.cil.oc.util.ThreadPoolFactory + +object BufferedFileSaveHandler { + + private var _threadPool: ScheduledExecutorService = _ + + private def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = { + if (_threadPool == null) { + OpenComputers.log.warn("Error handling file saving: Did the server never start?") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } else if (_threadPool.isShutdown || _threadPool.isTerminated) { + OpenComputers.log.warn("Error handling file saving: Thread pool shut down!") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } + Option(f(_threadPool)) + } + + def newThreadPool(): Unit = { + if (_threadPool != null && !_threadPool.isTerminated) { + _threadPool.shutdownNow() + } + _threadPool = ThreadPoolFactory.create("FileSystem", 1) + } + + def scheduleSave(fs: Buffered): Option[Future[_]] = withPool(threadPool => threadPool.submit(new Runnable { + override def run(): Unit = fs.saveFiles() + })) + + def waitForSaving(): Unit = withPool(threadPool => { + try { + threadPool.shutdown() + var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.warn("Warning: Saving the filesystem has already taken 15 seconds!") + terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.error("Warning: Saving the filesystem has already taken two minutes! Aborting") + threadPool.shutdownNow() + } + } + } catch { + case e: InterruptedException => e.printStackTrace() + } + null + }, requiresPool = false) +} From 4010927a3ee04700d80fdeade705fca885bc7c92 Mon Sep 17 00:00:00 2001 From: Vexatos Date: Fri, 16 Feb 2018 01:32:12 +0100 Subject: [PATCH 3/3] Offloading state saving into a separate thread. (cherry picked from commit 0b256e0) --- src/main/scala/li/cil/oc/OpenComputers.scala | 6 +- .../scala/li/cil/oc/common/SaveHandler.scala | 148 +++++++++--------- .../scala/li/cil/oc/server/fs/Buffered.scala | 12 +- .../server/fs/BufferedFileSaveHandler.scala | 63 -------- .../li/cil/oc/util/ThreadPoolFactory.scala | 64 ++++++++ 5 files changed, 152 insertions(+), 141 deletions(-) delete mode 100644 src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala diff --git a/src/main/scala/li/cil/oc/OpenComputers.scala b/src/main/scala/li/cil/oc/OpenComputers.scala index 7584aa098..d6e431185 100644 --- a/src/main/scala/li/cil/oc/OpenComputers.scala +++ b/src/main/scala/li/cil/oc/OpenComputers.scala @@ -9,7 +9,7 @@ import cpw.mods.fml.common.network.FMLEventChannel import li.cil.oc.common.IMC import li.cil.oc.common.Proxy import li.cil.oc.server.command.CommandHandler -import li.cil.oc.server.fs.BufferedFileSaveHandler +import li.cil.oc.util.ThreadPoolFactory import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger @@ -57,12 +57,12 @@ object OpenComputers { @EventHandler def serverStart(e: FMLServerStartingEvent): Unit = { CommandHandler.register(e) - BufferedFileSaveHandler.newThreadPool() + ThreadPoolFactory.safePools.foreach(_.newThreadPool()) } @EventHandler def serverStop(e: FMLServerStoppedEvent): Unit = { - BufferedFileSaveHandler.waitForSaving() + ThreadPoolFactory.safePools.foreach(_.waitForCompletion()) } @EventHandler diff --git a/src/main/scala/li/cil/oc/common/SaveHandler.scala b/src/main/scala/li/cil/oc/common/SaveHandler.scala index 33211e646..8f785e769 100644 --- a/src/main/scala/li/cil/oc/common/SaveHandler.scala +++ b/src/main/scala/li/cil/oc/common/SaveHandler.scala @@ -4,6 +4,11 @@ import java.io import java.io._ import java.nio.file._ import java.nio.file.attribute.BasicFileAttributes +import java.util.concurrent.CancellationException +import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import cpw.mods.fml.common.eventhandler.EventPriority import cpw.mods.fml.common.eventhandler.SubscribeEvent @@ -12,12 +17,13 @@ import li.cil.oc.Settings import li.cil.oc.api.machine.MachineHost import li.cil.oc.api.network.EnvironmentHost import li.cil.oc.util.BlockPosition +import li.cil.oc.util.SafeThreadPool +import li.cil.oc.util.ThreadPoolFactory import net.minecraft.nbt.CompressedStreamTools import net.minecraft.nbt.NBTTagCompound import net.minecraft.world.ChunkCoordIntPair import net.minecraft.world.World import net.minecraftforge.common.DimensionManager -import net.minecraftforge.event.world.ChunkDataEvent import net.minecraftforge.event.world.WorldEvent import org.apache.commons.lang3.JavaVersion import org.apache.commons.lang3.SystemUtils @@ -42,7 +48,32 @@ object SaveHandler { // which takes a lot of time and is completely unnecessary in those cases. var savingForClients = false - val saveData = mutable.Map.empty[Int, mutable.Map[ChunkCoordIntPair, mutable.Map[String, Array[Byte]]]] + class SaveDataEntry(val data: Array[Byte], val pos: ChunkCoordIntPair, val name: String, val dimension: Int) extends Runnable { + override def run(): Unit = { + val path = statePath + val dimPath = new io.File(path, dimension.toString) + val chunkPath = new io.File(dimPath, s"${this.pos.chunkXPos}.${this.pos.chunkZPos}") + chunkDirs.add(chunkPath) + if (!chunkPath.exists()) { + chunkPath.mkdirs() + } + val file = new io.File(chunkPath, this.name) + try { + // val fos = new GZIPOutputStream(new io.FileOutputStream(file)) + val fos = new io.BufferedOutputStream(new io.FileOutputStream(file)) + fos.write(this.data) + fos.close() + } + catch { + case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) + } + } + } + + val stateSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("SaveHandler", 1) + + val chunkDirs = new ConcurrentLinkedDeque[io.File]() + val saving = mutable.HashMap.empty[String, Future[_]] def savePath = new io.File(DimensionManager.getCurrentSaveRootDirectory, Settings.savePath) @@ -113,37 +144,34 @@ object SaveHandler { val dimension = nbt.getInteger("dimension") val chunk = new ChunkCoordIntPair(nbt.getInteger("chunkX"), nbt.getInteger("chunkZ")) + // Wait for the latest save task for the requested file to complete. + // This prevents the chance of loading an outdated version + // of this file. + saving.get(name).foreach(f => try { + f.get(120L, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => OpenComputers.log.warn("Waiting for state data to save took two minutes! Aborting.") + case e: CancellationException => // NO-OP + }) + saving.remove(name) + load(dimension, chunk, name) } - def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]) = saveData.synchronized { + def scheduleSave(dimension: Int, chunk: ChunkCoordIntPair, name: String, data: Array[Byte]): Unit = { if (chunk == null) throw new IllegalArgumentException("chunk is null") else { - // Make sure we get rid of old versions (e.g. left over by other mods - // triggering a save - this is mostly used for RiM compatibility). We - // need to do this for *each* dimension, in case computers are teleported - // across dimensions. - for (chunks <- saveData.values) chunks.values.foreach(_ -= name) - val chunks = saveData.getOrElseUpdate(dimension, mutable.Map.empty) - chunks.getOrElseUpdate(chunk, mutable.Map.empty) += name -> data + // Disregarding whether or not there already was a + // save submitted for the requested file + // allows for better concurrency at the cost of + // doing more writing operations. + stateSaveHandler.withPool(_.submit(new SaveDataEntry(data, chunk, name, dimension))).foreach(saving.put(name, _)) } } def load(dimension: Int, chunk: ChunkCoordIntPair, name: String): Array[Byte] = { if (chunk == null) throw new IllegalArgumentException("chunk is null") - // Use data from 'cache' if possible. This avoids weird things happening - // when writeToNBT+readFromNBT is called by other mods (i.e. this is not - // used to actually save the data to disk). - saveData.get(dimension) match { - case Some(chunks) => chunks.get(chunk) match { - case Some(map) => map.get(name) match { - case Some(data) => return data - case _ => - } - case _ => - } - case _ => - } + val path = statePath val dimPath = new io.File(path, dimension.toString) val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") @@ -171,35 +199,28 @@ object SaveHandler { } } - @SubscribeEvent - def onChunkSave(e: ChunkDataEvent.Save) = saveData.synchronized { - val path = statePath - val dimension = e.world.provider.dimensionId - val chunk = e.getChunk.getChunkCoordIntPair - val dimPath = new io.File(path, dimension.toString) - val chunkPath = new io.File(dimPath, s"${chunk.chunkXPos}.${chunk.chunkZPos}") - if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { - for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() - } - saveData.get(dimension) match { - case Some(chunks) => chunks.get(chunk) match { - case Some(entries) => - chunkPath.mkdirs() - for ((name, data) <- entries) { - val file = new io.File(chunkPath, name) - try { - // val fos = new GZIPOutputStream(new io.FileOutputStream(file)) - val fos = new io.BufferedOutputStream(new io.FileOutputStream(file)) - fos.write(data) - fos.close() - } - catch { - case e: io.IOException => OpenComputers.log.warn(s"Error saving auxiliary tile entity data to '${file.getAbsolutePath}.", e) - } - } - case _ => chunkPath.delete() + def cleanSaveData(): Unit = { + while (!chunkDirs.isEmpty) { + val chunkPath = chunkDirs.poll() + if (chunkPath.exists && chunkPath.isDirectory && chunkPath.list() != null) { + for (file <- chunkPath.listFiles() if System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves) file.delete() } - case _ => + } + + // Delete empty folders to keep the state folder clean. + val emptyDirs = savePath.listFiles(new FileFilter { + override def accept(file: File) = file.isDirectory && + // Make sure we only consider file system folders (UUID). + file.getName.matches(uuidRegex) && + // We set the modified time in the save() method of unbuffered file + // systems, to avoid deleting in-use folders here. + System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && { + val list = file.list() + list == null || list.isEmpty + } + }) + if (emptyDirs != null) { + emptyDirs.filter(_ != null).foreach(_.delete()) } } @@ -224,28 +245,9 @@ object SaveHandler { @SubscribeEvent(priority = EventPriority.LOWEST) def onWorldSave(e: WorldEvent.Save) { - saveData.synchronized { - saveData.get(e.world.provider.dimensionId) match { - case Some(chunks) => chunks.clear() - case _ => - } - } - - // Delete empty folders to keep the state folder clean. - val emptyDirs = savePath.listFiles(new FileFilter { - override def accept(file: File) = file.isDirectory && - // Make sure we only consider file system folders (UUID). - file.getName.matches(uuidRegex) && - // We set the modified time in the save() method of unbuffered file - // systems, to avoid deleting in-use folders here. - System.currentTimeMillis() - file.lastModified() > TimeToHoldOntoOldSaves && { - val list = file.list() - list == null || list.isEmpty - } - }) - if (emptyDirs != null) { - emptyDirs.filter(_ != null).foreach(_.delete()) - } + stateSaveHandler.withPool(_.submit(new Runnable { + override def run(): Unit = cleanSaveData() + })) } } diff --git a/src/main/scala/li/cil/oc/server/fs/Buffered.scala b/src/main/scala/li/cil/oc/server/fs/Buffered.scala index 3373c7257..b7040227a 100644 --- a/src/main/scala/li/cil/oc/server/fs/Buffered.scala +++ b/src/main/scala/li/cil/oc/server/fs/Buffered.scala @@ -9,11 +9,17 @@ import java.util.concurrent.TimeoutException import li.cil.oc.OpenComputers import li.cil.oc.api.fs.Mode +import li.cil.oc.util.ThreadPoolFactory +import li.cil.oc.util.SafeThreadPool import net.minecraft.nbt.NBTTagCompound import org.apache.commons.io.FileUtils import scala.collection.mutable +object Buffered { + val fileSaveHandler: SafeThreadPool = ThreadPoolFactory.createSafePool("FileSystem", 1) +} + trait Buffered extends OutputStreamFileSystem { protected def fileRoot: io.File @@ -58,7 +64,7 @@ trait Buffered extends OutputStreamFileSystem { for (child <- directory.listFiles() if FileSystem.isValidFilename(child.getName)) { val childPath = path + child.getName val childFile = new io.File(directory, child.getName) - if (child.exists() && child .isDirectory && child.list() != null) { + if (child.exists() && child.isDirectory && child.list() != null) { recurse(childPath + "/", childFile) } else if (!exists(childPath) || !isDirectory(childPath)) { @@ -96,7 +102,9 @@ trait Buffered extends OutputStreamFileSystem { override def save(nbt: NBTTagCompound) = { super.save(nbt) - saving = BufferedFileSaveHandler.scheduleSave(this) + saving = Buffered.fileSaveHandler.withPool(_.submit(new Runnable { + override def run(): Unit = saveFiles() + })) } def saveFiles(): Unit = this.synchronized { diff --git a/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala b/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala deleted file mode 100644 index da3d5aeb4..000000000 --- a/src/main/scala/li/cil/oc/server/fs/BufferedFileSaveHandler.scala +++ /dev/null @@ -1,63 +0,0 @@ -package li.cil.oc.server.fs - -import java.util.concurrent.Future -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.TimeUnit - -import li.cil.oc.OpenComputers -import li.cil.oc.util.ThreadPoolFactory - -object BufferedFileSaveHandler { - - private var _threadPool: ScheduledExecutorService = _ - - private def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = { - if (_threadPool == null) { - OpenComputers.log.warn("Error handling file saving: Did the server never start?") - if (requiresPool) { - OpenComputers.log.warn("Creating new thread pool.") - newThreadPool() - } else { - return None - } - } else if (_threadPool.isShutdown || _threadPool.isTerminated) { - OpenComputers.log.warn("Error handling file saving: Thread pool shut down!") - if (requiresPool) { - OpenComputers.log.warn("Creating new thread pool.") - newThreadPool() - } else { - return None - } - } - Option(f(_threadPool)) - } - - def newThreadPool(): Unit = { - if (_threadPool != null && !_threadPool.isTerminated) { - _threadPool.shutdownNow() - } - _threadPool = ThreadPoolFactory.create("FileSystem", 1) - } - - def scheduleSave(fs: Buffered): Option[Future[_]] = withPool(threadPool => threadPool.submit(new Runnable { - override def run(): Unit = fs.saveFiles() - })) - - def waitForSaving(): Unit = withPool(threadPool => { - try { - threadPool.shutdown() - var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS) - if (!terminated) { - OpenComputers.log.warn("Warning: Saving the filesystem has already taken 15 seconds!") - terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS) - if (!terminated) { - OpenComputers.log.error("Warning: Saving the filesystem has already taken two minutes! Aborting") - threadPool.shutdownNow() - } - } - } catch { - case e: InterruptedException => e.printStackTrace() - } - null - }, requiresPool = false) -} diff --git a/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala b/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala index 01675fdd5..9c22b02b1 100644 --- a/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala +++ b/src/main/scala/li/cil/oc/util/ThreadPoolFactory.scala @@ -1,11 +1,17 @@ package li.cil.oc.util import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ThreadFactory +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import li.cil.oc.OpenComputers import li.cil.oc.Settings +import scala.collection.mutable + object ThreadPoolFactory { val priority = { val custom = Settings.get.threadPriority @@ -35,4 +41,62 @@ object ThreadPoolFactory { thread } }) + + val safePools: mutable.ArrayBuffer[SafeThreadPool] = mutable.ArrayBuffer.empty[SafeThreadPool] + + def createSafePool(name: String, threads: Int): SafeThreadPool = { + val handler = new SafeThreadPool(name, threads) + safePools += handler + handler + } +} + +class SafeThreadPool(val name: String, val threads: Int) { + private var _threadPool: ScheduledExecutorService = _ + + def withPool(f: ScheduledExecutorService => Future[_], requiresPool: Boolean = true): Option[Future[_]] = { + if (_threadPool == null) { + OpenComputers.log.warn("Error handling file saving: Did the server never start?") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } else if (_threadPool.isShutdown || _threadPool.isTerminated) { + OpenComputers.log.warn("Error handling file saving: Thread pool shut down!") + if (requiresPool) { + OpenComputers.log.warn("Creating new thread pool.") + newThreadPool() + } else { + return None + } + } + Option(f(_threadPool)) + } + + def newThreadPool(): Unit = { + if (_threadPool != null && !_threadPool.isTerminated) { + _threadPool.shutdownNow() + } + _threadPool = ThreadPoolFactory.create(name, threads) + } + + def waitForCompletion(): Unit = withPool(threadPool => { + try { + threadPool.shutdown() + var terminated = threadPool.awaitTermination(15, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.warn("Warning: Completing all tasks has already taken 15 seconds!") + terminated = threadPool.awaitTermination(105, TimeUnit.SECONDS) + if (!terminated) { + OpenComputers.log.error("Warning: Completing all tasks has already taken two minutes! Aborting") + threadPool.shutdownNow() + } + } + } catch { + case e: InterruptedException => e.printStackTrace() + } + null + }, requiresPool = false) }