Replace scheduler to executor

This commit is contained in:
huanghongxun 2019-02-26 11:39:38 +08:00
parent 15a75a69b3
commit 3c958d06c8
11 changed files with 243 additions and 385 deletions

View File

@ -102,7 +102,7 @@ public final class LauncherHelper {
try {
checkGameState(profile, setting, version, () -> {
Controllers.dialog(launchingStepsPane);
Schedulers.newThread().schedule(this::launch0);
Schedulers.newThread().execute(this::launch0);
});
} catch (InterruptedException ignore) {
}

View File

@ -25,6 +25,7 @@ import org.jackhuang.hmcl.task.Schedulers;
import org.jackhuang.hmcl.task.Task;
import java.util.Objects;
import java.util.concurrent.Executor;
public final class MultiMCInstallVersionSettingTask extends Task<Void> {
private final Profile profile;
@ -35,11 +36,8 @@ public final class MultiMCInstallVersionSettingTask extends Task<Void> {
this.profile = profile;
this.manifest = manifest;
this.version = version;
}
@Override
public Scheduler getScheduler() {
return Schedulers.javafx();
setExecutor(Schedulers.javafx());
}
@Override

View File

@ -197,7 +197,7 @@ public final class Accounts {
Account selected = selectedAccount.get();
if (selected != null) {
Schedulers.io().schedule(() -> {
Schedulers.io().execute(() -> {
try {
selected.logIn();
} catch (AuthenticationException e) {
@ -209,7 +209,7 @@ public final class Accounts {
for (AuthlibInjectorServer server : config().getAuthlibInjectorServers()) {
if (selected instanceof AuthlibInjectorAccount && ((AuthlibInjectorAccount) selected).getServer() == server)
continue;
Schedulers.io().schedule(() -> {
Schedulers.io().execute(() -> {
try {
server.fetchMetadataResponse();
} catch (IOException e) {

View File

@ -37,6 +37,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import static java.util.Objects.requireNonNull;
@ -124,6 +125,7 @@ public class FileDownloadTask extends Task<Void> {
this.retry = retry;
setName(file.getName());
setExecutor(Schedulers.io());
}
private void closeFiles() {
@ -145,11 +147,6 @@ public class FileDownloadTask extends Task<Void> {
stream = null;
}
@Override
public Scheduler getScheduler() {
return Schedulers.io();
}
public EventManager<FailedEvent<URL>> getOnFailed() {
return onFailed;
}

View File

@ -30,6 +30,7 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -59,11 +60,7 @@ public final class GetTask extends Task<String> {
this.retry = retry;
setName(url.toString());
}
@Override
public Scheduler getScheduler() {
return Schedulers.io();
setExecutor(Schedulers.io());
}
public GetTask setCacheRepository(CacheRepository repository) {

View File

@ -1,45 +0,0 @@
/*
* Hello Minecraft! Launcher
* Copyright (C) 2019 huangyuhui <huanghongxun2008@126.com> and contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jackhuang.hmcl.task;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.jackhuang.hmcl.util.function.ExceptionalRunnable;
/**
*
* @author huangyuhui
*/
class SchedulerExecutorService extends Scheduler {
private final ExecutorService executorService;
public SchedulerExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public Future<?> schedule(ExceptionalRunnable<?> block) {
if (executorService.isShutdown() || executorService.isTerminated())
return Schedulers.NONE.schedule(block);
else
return executorService.submit(block.toCallable());
}
}

View File

@ -1,92 +0,0 @@
/*
* Hello Minecraft! Launcher
* Copyright (C) 2019 huangyuhui <huanghongxun2008@126.com> and contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jackhuang.hmcl.task;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import org.jackhuang.hmcl.util.function.ExceptionalRunnable;
import org.jetbrains.annotations.NotNull;
/**
*
* @author huangyuhui
*/
class SchedulerImpl extends Scheduler {
private final Executor executor;
public SchedulerImpl(Executor executor) {
this.executor = executor;
}
@Override
public Future<?> schedule(ExceptionalRunnable<?> block) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> wrapper = new AtomicReference<>();
executor.execute(() -> {
try {
block.run();
} catch (Exception e) {
wrapper.set(e);
} finally {
latch.countDown();
}
Thread.interrupted(); // clear the `interrupted` flag to prevent from interrupting EventDispatch thread.
});
return new Future<Void>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return latch.getCount() == 0;
}
private Void getImpl() throws ExecutionException {
Exception e = wrapper.get();
if (e != null)
throw new ExecutionException(e);
return null;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
latch.await();
return getImpl();
}
@Override
public Void get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!latch.await(timeout, unit))
throw new TimeoutException();
return getImpl();
}
};
}
}

View File

@ -17,8 +17,10 @@
*/
package org.jackhuang.hmcl.task;
import javafx.application.Platform;
import org.jackhuang.hmcl.util.Logging;
import javax.swing.*;
import java.util.concurrent.*;
/**
@ -32,7 +34,7 @@ public final class Schedulers {
private static volatile ExecutorService CACHED_EXECUTOR;
private static synchronized ExecutorService getCachedExecutorService() {
public static synchronized Executor newThread() {
if (CACHED_EXECUTOR == null)
CACHED_EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory());
@ -42,7 +44,7 @@ public final class Schedulers {
private static volatile ExecutorService IO_EXECUTOR;
private static synchronized ExecutorService getIOExecutorService() {
public static synchronized Executor io() {
if (IO_EXECUTOR == null)
IO_EXECUTOR = Executors.newFixedThreadPool(6, runnable -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
@ -55,7 +57,7 @@ public final class Schedulers {
private static volatile ExecutorService SINGLE_EXECUTOR;
private static synchronized ExecutorService getSingleExecutorService() {
public static synchronized Executor computation() {
if (SINGLE_EXECUTOR == null)
SINGLE_EXECUTOR = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
@ -66,54 +68,18 @@ public final class Schedulers {
return SINGLE_EXECUTOR;
}
private static final Scheduler IMMEDIATE = new SchedulerImpl(Runnable::run);
public static Scheduler immediate() {
return IMMEDIATE;
public static Executor javafx() {
return Platform::runLater;
}
private static Scheduler NEW_THREAD;
public static synchronized Scheduler newThread() {
if (NEW_THREAD == null)
NEW_THREAD = new SchedulerExecutorService(getCachedExecutorService());
return NEW_THREAD;
public static Executor swing() {
return SwingUtilities::invokeLater;
}
private static Scheduler IO;
public static synchronized Scheduler io() {
if (IO == null)
IO = new SchedulerExecutorService(getIOExecutorService());
return IO;
}
private static Scheduler COMPUTATION;
public static synchronized Scheduler computation() {
if (COMPUTATION == null)
COMPUTATION = new SchedulerExecutorService(getSingleExecutorService());
return COMPUTATION;
}
private static final Scheduler JAVAFX = new SchedulerImpl(javafx.application.Platform::runLater);
public static Scheduler javafx() {
return JAVAFX;
}
private static final Scheduler SWING = new SchedulerImpl(javax.swing.SwingUtilities::invokeLater);
public static Scheduler swing() {
return SWING;
}
public static synchronized Scheduler defaultScheduler() {
public static Executor defaultScheduler() {
return newThread();
}
static final Scheduler NONE = new SchedulerImpl(any -> {});
public static synchronized void shutdown() {
Logging.LOG.info("Shutting down executor services.");

View File

@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.logging.Level;
@ -57,37 +58,44 @@ public abstract class Task<T> {
return significance;
}
public void setSignificance(TaskSignificance significance) {
public final void setSignificance(TaskSignificance significance) {
this.significance = significance;
}
// state
private TaskState state = TaskState.READY;
public TaskState getState() {
public final TaskState getState() {
return state;
}
void setState(TaskState state) {
final void setState(TaskState state) {
this.state = state;
}
// last exception
private Exception exception;
public Exception getException() {
public final Exception getException() {
return exception;
}
void setException(Exception e) {
final void setException(Exception e) {
exception = e;
}
private Executor executor = Schedulers.defaultScheduler();
/**
* The scheduler that decides how this task runs.
* The executor that decides how this task runs.
*/
public Scheduler getScheduler() {
return Schedulers.defaultScheduler();
public final Executor getExecutor() {
return executor;
}
public final Task<T> setExecutor(Executor executor) {
this.executor = executor;
return this;
}
// dependents succeeded
@ -142,6 +150,14 @@ public abstract class Task<T> {
return this;
}
@Override
public String toString() {
if (getClass().getName().equals(getName()))
return getName();
else
return getClass().getName() + "[" + getName() + "]";
}
// result
private T result;
private Consumer<T> resultConsumer;
@ -314,7 +330,7 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, is executed using the default Scheduler, with this
* normally, is executed using the default Executor, with this
* task's result as the argument to the supplied function.
*
* @param fn the function to use to compute the value of the returned Task
@ -327,36 +343,36 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, is executed using the supplied Scheduler, with this
* normally, is executed using the supplied Executor, with this
* task's result as the argument to the supplied function.
*
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @param fn the function to use to compute the value of the returned Task
* @param <U> the function's return type
* @return the new Task
*/
public <U, E extends Exception> Task<U> thenApply(Scheduler scheduler, ExceptionalFunction<T, U, E> fn) {
return thenApply(getCaller(), scheduler, fn);
public <U, E extends Exception> Task<U> thenApply(Executor executor, ExceptionalFunction<T, U, E> fn) {
return thenApply(getCaller(), executor, fn);
}
/**
* Returns a new Task that, when this task completes
* normally, is executed using the supplied Scheduler, with this
* normally, is executed using the supplied Executor, with this
* task's result as the argument to the supplied function.
*
* @param name the name of this new Task for displaying
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @param fn the function to use to compute the value of the returned Task
* @param <U> the function's return type
* @return the new Task
*/
public <U, E extends Exception> Task<U> thenApply(String name, Scheduler scheduler, ExceptionalFunction<T, U, E> fn) {
return new UniApply<>(name, scheduler, fn);
public <U, E extends Exception> Task<U> thenApply(String name, Executor executor, ExceptionalFunction<T, U, E> fn) {
return new UniApply<>(fn).setExecutor(executor).setName(name);
}
/**
* Returns a new Task that, when this task completes
* normally, is executed using the default Scheduler, with this
* normally, is executed using the default Executor, with this
* task's result as the argument to the supplied action.
*
* @param action the action to perform before completing the
@ -369,29 +385,29 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, is executed using the supplied Scheduler, with this
* normally, is executed using the supplied Executor, with this
* task's result as the argument to the supplied action.
*
* @param action the action to perform before completing the returned Task
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public <E extends Exception> Task<Void> thenAccept(Scheduler scheduler, ExceptionalConsumer<T, E> action) {
return thenAccept(getCaller(), scheduler, action);
public <E extends Exception> Task<Void> thenAccept(Executor executor, ExceptionalConsumer<T, E> action) {
return thenAccept(getCaller(), executor, action);
}
/**
* Returns a new Task that, when this task completes
* normally, is executed using the supplied Scheduler, with this
* normally, is executed using the supplied Executor, with this
* task's result as the argument to the supplied action.
*
* @param name the name of this new Task for displaying
* @param action the action to perform before completing the returned Task
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public <E extends Exception> Task<Void> thenAccept(String name, Scheduler scheduler, ExceptionalConsumer<T, E> action) {
return thenApply(name, scheduler, result -> {
public <E extends Exception> Task<Void> thenAccept(String name, Executor executor, ExceptionalConsumer<T, E> action) {
return thenApply(name, executor, result -> {
action.accept(result);
return null;
});
@ -399,7 +415,7 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, executes the given action using the default Scheduler.
* normally, executes the given action using the default Executor.
*
* @param action the action to perform before completing the
* returned Task
@ -411,29 +427,29 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, executes the given action using the supplied Scheduler.
* normally, executes the given action using the supplied Executor.
*
* @param action the action to perform before completing the
* returned Task
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public <E extends Exception> Task<Void> thenRun(Scheduler scheduler, ExceptionalRunnable<E> action) {
return thenRun(getCaller(), scheduler, action);
public <E extends Exception> Task<Void> thenRun(Executor executor, ExceptionalRunnable<E> action) {
return thenRun(getCaller(), executor, action);
}
/**
* Returns a new Task that, when this task completes
* normally, executes the given action using the supplied Scheduler.
* normally, executes the given action using the supplied Executor.
*
* @param name the name of this new Task for displaying
* @param action the action to perform before completing the
* returned Task
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public <E extends Exception> Task<Void> thenRun(String name, Scheduler scheduler, ExceptionalRunnable<E> action) {
return thenApply(name, scheduler, ignore -> {
public <E extends Exception> Task<Void> thenRun(String name, Executor executor, ExceptionalRunnable<E> action) {
return thenApply(name, executor, ignore -> {
action.run();
return null;
});
@ -441,7 +457,7 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, is executed using the default Scheduler.
* normally, is executed using the default Executor.
*
* @param fn the function to use to compute the value of the returned Task
* @param <U> the function's return type
@ -453,7 +469,7 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, is executed using the default Scheduler.
* normally, is executed using the default Executor.
*
* @param name the name of this new Task for displaying
* @param fn the function to use to compute the value of the returned Task
@ -511,7 +527,7 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, executes the given action using the default Scheduler.
* normally, executes the given action using the default Executor.
*
* @param action the action to perform before completing the
* returned Task
@ -523,29 +539,29 @@ public abstract class Task<T> {
/**
* Returns a new Task that, when this task completes
* normally, executes the given action using the supplied Scheduler.
* normally, executes the given action using the supplied Executor.
*
* @param action the action to perform before completing the
* returned Task
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public <E extends Exception> Task<Void> withRun(Scheduler scheduler, ExceptionalRunnable<E> action) {
return withRun(getCaller(), scheduler, action);
public <E extends Exception> Task<Void> withRun(Executor executor, ExceptionalRunnable<E> action) {
return withRun(getCaller(), executor, action);
}
/**
* Returns a new Task that, when this task completes
* normally, executes the given action using the supplied Scheduler.
* normally, executes the given action using the supplied Executor.
*
* @param name the name of this new Task for displaying
* @param action the action to perform before completing the
* returned Task
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public <E extends Exception> Task<Void> withRun(String name, Scheduler scheduler, ExceptionalRunnable<E> action) {
return new UniCompose<>(() -> Task.runAsync(name, scheduler, action), false);
public <E extends Exception> Task<Void> withRun(String name, Executor executor, ExceptionalRunnable<E> action) {
return new UniCompose<>(() -> Task.runAsync(name, executor, action), false);
}
/**
@ -579,20 +595,15 @@ public abstract class Task<T> {
* with this exception unless this task also completed exceptionally.
*
* @param action the action to perform
* @param scheduler the executor to use for asynchronous execution
* @param executor the executor to use for asynchronous execution
* @return the new Task
*/
public final Task<Void> whenComplete(Scheduler scheduler, FinalizedCallback action) {
public final Task<Void> whenComplete(Executor executor, FinalizedCallback action) {
return new Task<Void>() {
{
setSignificance(TaskSignificance.MODERATE);
}
@Override
public Scheduler getScheduler() {
return scheduler;
}
@Override
public void execute() throws Exception {
if (isDependentsSucceeded() != (Task.this.getException() == null))
@ -618,7 +629,7 @@ public abstract class Task<T> {
public boolean isRelyingOnDependents() {
return false;
}
}.setName(getCaller());
}.setExecutor(executor).setName(getCaller());
}
/**
@ -636,8 +647,8 @@ public abstract class Task<T> {
* @param action the action to perform
* @return the new Task
*/
public Task<Void> whenComplete(Scheduler scheduler, FinalizedCallbackWithResult<T> action) {
return whenComplete(scheduler, (exception -> action.execute(getResult(), exception)));
public Task<Void> whenComplete(Executor executor, FinalizedCallbackWithResult<T> action) {
return whenComplete(executor, (exception -> action.execute(getResult(), exception)));
}
/**
@ -655,8 +666,8 @@ public abstract class Task<T> {
* @param failure the action to perform when this task exceptionally returned
* @return the new Task
*/
public final <E1 extends Exception, E2 extends Exception> Task<Void> whenComplete(Scheduler scheduler, ExceptionalRunnable<E1> success, ExceptionalConsumer<Exception, E2> failure) {
return whenComplete(scheduler, exception -> {
public final <E1 extends Exception, E2 extends Exception> Task<Void> whenComplete(Executor executor, ExceptionalRunnable<E1> success, ExceptionalConsumer<Exception, E2> failure) {
return whenComplete(executor, exception -> {
if (exception == null) {
if (success != null)
try {
@ -688,8 +699,8 @@ public abstract class Task<T> {
* @param failure the action to perform when this task exceptionally returned
* @return the new Task
*/
public <E1 extends Exception, E2 extends Exception> Task<Void> whenComplete(Scheduler scheduler, ExceptionalConsumer<T, E1> success, ExceptionalConsumer<Exception, E2> failure) {
return whenComplete(scheduler, () -> success.accept(getResult()), failure);
public <E1 extends Exception, E2 extends Exception> Task<Void> whenComplete(Executor executor, ExceptionalConsumer<T, E1> success, ExceptionalConsumer<Exception, E2> failure) {
return whenComplete(executor, () -> success.accept(getResult()), failure);
}
public static Task<Void> runAsync(ExceptionalRunnable<?> closure) {
@ -700,12 +711,12 @@ public abstract class Task<T> {
return runAsync(name, Schedulers.defaultScheduler(), closure);
}
public static Task<Void> runAsync(Scheduler scheduler, ExceptionalRunnable<?> closure) {
return runAsync(getCaller(), scheduler, closure);
public static Task<Void> runAsync(Executor executor, ExceptionalRunnable<?> closure) {
return runAsync(getCaller(), executor, closure);
}
public static Task<Void> runAsync(String name, Scheduler scheduler, ExceptionalRunnable<?> closure) {
return new SimpleTask<>(closure.toCallable(), scheduler).setName(name);
public static Task<Void> runAsync(String name, Executor executor, ExceptionalRunnable<?> closure) {
return new SimpleTask<>(closure.toCallable()).setExecutor(executor).setName(name);
}
public static <T> Task<T> composeAsync(ExceptionalSupplier<Task<T>, ?> fn) {
@ -730,16 +741,16 @@ public abstract class Task<T> {
return supplyAsync(getCaller(), callable);
}
public static <V> Task<V> supplyAsync(Scheduler scheduler, Callable<V> callable) {
return supplyAsync(getCaller(), scheduler, callable);
public static <V> Task<V> supplyAsync(Executor executor, Callable<V> callable) {
return supplyAsync(getCaller(), executor, callable);
}
public static <V> Task<V> supplyAsync(String name, Callable<V> callable) {
return supplyAsync(name, Schedulers.defaultScheduler(), callable);
}
public static <V> Task<V> supplyAsync(String name, Scheduler scheduler, Callable<V> callable) {
return new SimpleTask<>(callable, scheduler).setName(name);
public static <V> Task<V> supplyAsync(String name, Executor executor, Callable<V> callable) {
return new SimpleTask<>(callable).setExecutor(executor).setName(name);
}
/**
@ -822,16 +833,9 @@ public abstract class Task<T> {
private static final class SimpleTask<T> extends Task<T> {
private final Callable<T> callable;
private final Scheduler scheduler;
SimpleTask(Callable<T> callable, Scheduler scheduler) {
SimpleTask(Callable<T> callable) {
this.callable = callable;
this.scheduler = scheduler;
}
@Override
public Scheduler getScheduler() {
return scheduler;
}
@Override
@ -841,14 +845,10 @@ public abstract class Task<T> {
}
private class UniApply<R> extends Task<R> {
private final Scheduler scheduler;
private final ExceptionalFunction<T, R, ?> callable;
UniApply(String name, Scheduler scheduler, ExceptionalFunction<T, R, ?> callable) {
this.scheduler = scheduler;
UniApply(ExceptionalFunction<T, R, ?> callable) {
this.callable = callable;
setName(name);
}
@Override
@ -856,11 +856,6 @@ public abstract class Task<T> {
return Collections.singleton(Task.this);
}
@Override
public Scheduler getScheduler() {
return scheduler;
}
@Override
public void execute() throws Exception {
setResult(callable.apply(Task.this.getResult()));

View File

@ -54,6 +54,12 @@ public final class TaskExecutor {
future = executeTasks(Collections.singleton(firstTask))
.thenApplyAsync(exception -> {
boolean success = exception == null;
if (!success) {
// We log exception stacktrace because some of exceptions occurred because of bugs.
Logging.LOG.log(Level.WARNING, "An exception occurred in task execution", exception);
}
taskListeners.forEach(it -> it.onStop(success, this));
return success;
})
@ -71,6 +77,7 @@ public final class TaskExecutor {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException | CancellationException e) {
e.printStackTrace();
}
return false;
}
@ -114,110 +121,113 @@ public final class TaskExecutor {
});
}
private static void scheduleTo(Scheduler scheduler, ExceptionalRunnable<?> runnable) {
scheduleTo(scheduler, runnable, null);
}
private static void scheduleTo(Scheduler scheduler, ExceptionalRunnable<?> runnable, Runnable finalize) {
try {
scheduler.schedule(runnable).get();
} catch (ExecutionException | InterruptedException e) {
throw new CompletionException(e);
} finally {
if (finalize != null)
finalize.run();
}
}
private CompletableFuture<?> executeTask(Task<?> task) {
return CompletableFuture.completedFuture(null).thenComposeAsync(unused -> {
task.setState(Task.TaskState.READY);
return CompletableFuture.completedFuture(null)
.thenComposeAsync(unused -> {
task.setState(Task.TaskState.READY);
if (task.getSignificance().shouldLog())
Logging.LOG.log(Level.FINE, "Executing task: " + task.getName());
if (task.getSignificance().shouldLog())
Logging.LOG.log(Level.FINE, "Executing task: " + task.getName());
taskListeners.forEach(it -> it.onReady(task));
taskListeners.forEach(it -> it.onReady(task));
if (task.doPreExecute()) {
scheduleTo(task.getScheduler(), task::preExecute);
}
return executeTasks(task.getDependents());
}).thenComposeAsync(dependentsException -> {
boolean isDependentsSucceeded = dependentsException == null;
if (!isDependentsSucceeded && task.isRelyingOnDependents()) {
task.setException(dependentsException);
rethrow(dependentsException);
}
if (isDependentsSucceeded)
task.setDependentsSucceeded();
scheduleTo(task.getScheduler(), () -> {
task.setState(Task.TaskState.RUNNING);
taskListeners.forEach(it -> it.onRunning(task));
task.execute();
}, () -> task.setState(Task.TaskState.EXECUTED));
return executeTasks(task.getDependencies());
}).thenApplyAsync(dependenciesException -> {
boolean isDependenciesSucceeded = dependenciesException == null;
if (isDependenciesSucceeded)
task.setDependenciesSucceeded();
if (task.doPostExecute()) {
scheduleTo(task.getScheduler(), task::postExecute);
}
if (!isDependenciesSucceeded && task.isRelyingOnDependencies()) {
Logging.LOG.severe("Subtasks failed for " + task.getName());
task.setException(dependenciesException);
rethrow(dependenciesException);
}
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINER, "Task finished: " + task.getName());
}
task.onDone().fireEvent(new TaskEvent(this, task, false));
taskListeners.forEach(it -> it.onFinished(task));
task.setState(Task.TaskState.SUCCEEDED);
return null;
}).thenApplyAsync(unused -> null).exceptionally(throwable -> {
if (!(throwable instanceof CompletionException))
throw new AssertionError();
Throwable resolved = resolveException(throwable);
if (resolved instanceof Exception) {
Exception e = (Exception) resolved;
if (e instanceof InterruptedException) {
task.setException(e);
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task aborted: " + task.getName());
if (task.doPreExecute()) {
return CompletableFuture.runAsync(wrap(task::preExecute), task.getExecutor());
} else {
return CompletableFuture.completedFuture(null);
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
} else if (e instanceof CancellationException || e instanceof RejectedExecutionException) {
task.setException(e);
} else {
task.setException(e);
exception = e;
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task failed: " + task.getName(), e);
})
.thenComposeAsync(unused -> {
return executeTasks(task.getDependents());
})
.thenComposeAsync(dependentsException -> {
boolean isDependentsSucceeded = dependentsException == null;
if (!isDependentsSucceeded && task.isRelyingOnDependents()) {
task.setException(dependentsException);
rethrow(dependentsException);
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
}
task.setState(Task.TaskState.FAILED);
}
if (isDependentsSucceeded)
task.setDependentsSucceeded();
throw (CompletionException) throwable; // rethrow error
});
return CompletableFuture.runAsync(wrap(() -> {
task.setState(Task.TaskState.RUNNING);
taskListeners.forEach(it -> it.onRunning(task));
task.execute();
}), task.getExecutor()).whenComplete((unused, throwable) -> {
task.setState(Task.TaskState.EXECUTED);
rethrow(throwable);
});
})
.thenComposeAsync(unused -> {
return executeTasks(task.getDependencies());
})
.thenComposeAsync(dependenciesException -> {
boolean isDependenciesSucceeded = dependenciesException == null;
if (isDependenciesSucceeded)
task.setDependenciesSucceeded();
if (task.doPostExecute()) {
return CompletableFuture.runAsync(wrap(task::postExecute), task.getExecutor())
.thenApply(unused -> dependenciesException);
} else {
return CompletableFuture.completedFuture(dependenciesException);
}
})
.thenApplyAsync(dependenciesException -> {
boolean isDependenciesSucceeded = dependenciesException == null;
if (!isDependenciesSucceeded && task.isRelyingOnDependencies()) {
Logging.LOG.severe("Subtasks failed for " + task.getName());
task.setException(dependenciesException);
rethrow(dependenciesException);
}
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINER, "Task finished: " + task.getName());
}
task.onDone().fireEvent(new TaskEvent(this, task, false));
taskListeners.forEach(it -> it.onFinished(task));
task.setState(Task.TaskState.SUCCEEDED);
return null;
})
.thenApplyAsync(unused -> null)
.exceptionally(throwable -> {
if (!(throwable instanceof CompletionException))
throw new AssertionError();
Throwable resolved = resolveException(throwable);
if (resolved instanceof Exception) {
Exception e = (Exception) resolved;
if (e instanceof InterruptedException) {
task.setException(e);
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task aborted: " + task.getName());
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
} else if (e instanceof CancellationException || e instanceof RejectedExecutionException) {
task.setException(e);
} else {
task.setException(e);
exception = e;
if (task.getSignificance().shouldLog()) {
Logging.LOG.log(Level.FINE, "Task failed: " + task.getName(), e);
}
task.onDone().fireEvent(new TaskEvent(this, task, true));
taskListeners.forEach(it -> it.onFailed(task, e));
}
task.setState(Task.TaskState.FAILED);
}
throw (CompletionException) throwable; // rethrow error
});
}
public int getRunningTasks() {
@ -232,6 +242,8 @@ public final class TaskExecutor {
}
private static void rethrow(Throwable e) {
if (e == null)
return;
if (e instanceof ExecutionException || e instanceof CompletionException) { // including UncheckedException and UncheckedThrowable
rethrow(e.getCause());
} else if (e instanceof RuntimeException) {
@ -240,4 +252,14 @@ public final class TaskExecutor {
throw new CompletionException(e);
}
}
private static Runnable wrap(ExceptionalRunnable<?> runnable) {
return () -> {
try {
runnable.run();
} catch (Exception e) {
rethrow(e);
}
};
}
}

View File

@ -1,6 +1,9 @@
package org.jackhuang.hmcl.util;
import javafx.embed.swing.JFXPanel;
import org.jackhuang.hmcl.task.Schedulers;
import org.jackhuang.hmcl.task.Task;
import org.jackhuang.hmcl.util.platform.JavaVersion;
import org.junit.Assert;
import org.junit.Test;
@ -21,7 +24,7 @@ public class TaskTest {
}))
)).whenComplete(Assert::assertNull).test());
Assert.assertTrue(throwable.get() instanceof Error);
Assert.assertTrue("Error has not been thrown to uncaught exception handler", throwable.get() instanceof Error);
}
/**
@ -29,11 +32,13 @@ public class TaskTest {
*/
@Test
public void testWhenComplete() {
Assert.assertFalse(Task.supplyAsync(() -> {
boolean result = Task.supplyAsync(() -> {
throw new IllegalStateException();
}).whenComplete(exception -> {
Assert.assertTrue(exception instanceof IllegalStateException);
}).test());
}).test();
Assert.assertFalse("Task should fail at this case", result);
}
@Test
@ -45,7 +50,22 @@ public class TaskTest {
bool.set(true);
}).test();
Assert.assertTrue(success);
Assert.assertTrue(bool.get());
Assert.assertTrue("Task should success because withRun will ignore previous exception", success);
Assert.assertTrue("withRun should be executed", bool.get());
}
@Test
public void testThenAccept() {
new JFXPanel(); // init JavaFX Toolkit
AtomicBoolean flag = new AtomicBoolean();
boolean result = Task.supplyAsync(JavaVersion::fromCurrentEnvironment)
.thenAccept(Schedulers.javafx(), javaVersion -> {
flag.set(true);
Assert.assertEquals(javaVersion, JavaVersion.fromCurrentEnvironment());
})
.test();
Assert.assertTrue("Task does not succeed", result);
Assert.assertTrue("ThenAccept has not been executed", flag.get());
}
}