feat: WIP download task

This commit is contained in:
huanghongxun 2020-09-19 13:19:40 +08:00
parent 3e5a4b1d49
commit 9f486f5549
8 changed files with 733 additions and 669 deletions

View File

@ -8,5 +8,6 @@ public abstract class CompletableFutureTask<T> extends Task<T> {
@Override
public void execute() throws Exception {
throw new AssertionError("Cannot reach here");
}
}

View File

@ -1,66 +1,250 @@
package org.jackhuang.hmcl.task;
import com.google.gson.JsonParseException;
import org.jackhuang.hmcl.event.Event;
import org.jackhuang.hmcl.event.EventBus;
import org.jackhuang.hmcl.util.CacheRepository;
import org.jackhuang.hmcl.util.ToStringBuilder;
import org.jackhuang.hmcl.util.gson.JsonUtils;
import org.jackhuang.hmcl.util.io.FileUtils;
import org.jetbrains.annotations.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.jackhuang.hmcl.util.Logging.LOG;
class DownloadManager {
public class DownloadManager {
static DownloadTaskState download(List<String> urls, Path file, int initialParts) throws IOException {
Path downloadingFile = file.resolveSibling(FileUtils.getName(file) + ".download");
Path stateFile = file.resolveSibling(FileUtils.getName(file) + ".status");
DownloadState state = null;
if (Files.exists(downloadingFile) && Files.exists(stateFile)) {
// Resume downloading from state
try {
String status = FileUtils.readText(stateFile);
state = JsonUtils.fromNonNullJson(status, DownloadState.class);
} catch (JsonParseException e) {
LOG.log(Level.WARNING, "Failed to parse download state file", e);
}
public static class DownloadTaskStateBuilder {
private List<URL> urls;
private Path file;
private int retry = 3;
private int initialParts = 1;
/**
* Set the url of remote file to be downloaded.
* @param url url of the remote file to be downloaded
* @return this
*/
public DownloadTaskStateBuilder setUrl(URL url) {
this.urls = Collections.singletonList(url);
return this;
}
if (state == null || !urls.equals(state.urls)) {
return DownloadTaskState.newWithLengthUnknown(urls, initialParts);
} else {
return new DownloadTaskState(state);
/**
* Set urls of the remote file to be downloaded, will be attempted in order.
* @param urls urls of remote files to be downloaded
* @return this
*/
public DownloadTaskStateBuilder setUrls(List<URL> urls) {
this.urls = urls;
return this;
}
/**
* Set location to save remote file.
* @param file location to save the remote file.
* @return this
*/
public DownloadTaskStateBuilder setFile(Path file) {
this.file = file;
return this;
}
/**
* Set location to save remote file.
* @param file location to save the remote file.
* @return this
*/
public DownloadTaskStateBuilder setFile(File file) {
this.file = file.toPath();
return this;
}
/**
* Set retry times of one url.
* @param retry retry times of one url.
* @return this
*/
public DownloadTaskStateBuilder setRetry(int retry) {
this.retry = retry;
return this;
}
/**
* Splits the remote file into multiple parts, and download in different
* threads.
*
* @param initialParts number of threads to download the file.
* @return this
*/
public DownloadTaskStateBuilder setInitialParts(int initialParts) {
this.initialParts = initialParts;
return this;
}
public DownloadTaskState build() throws IOException {
if (file == null) {
return DownloadTaskState.newWithLengthUnknown(urls, Files.createTempFile(null, null), retry, initialParts);
}
Path downloadingFile = file.resolveSibling(FileUtils.getName(file) + ".download");
Path stateFile = file.resolveSibling(FileUtils.getName(file) + ".status");
DownloadState state = null;
if (Files.exists(downloadingFile) && Files.exists(stateFile)) {
// Resume downloading from state
try {
String status = FileUtils.readText(stateFile);
state = JsonUtils.fromNonNullJson(status, DownloadState.class);
} catch (JsonParseException e) {
LOG.log(Level.WARNING, "Failed to parse download state file", e);
}
}
if (state == null || !urls.equals(state.urls)) {
return DownloadTaskState.newWithLengthUnknown(urls, file, retry, initialParts);
} else {
return new DownloadTaskState(state, file, retry);
}
}
}
protected static class DownloadTaskState {
private final List<String> urls;
private final List<DownloadSegment> segments;
private final List<Thread> threads;
private String fastestUrl;
private int retry = 0;
private boolean cancelled = false;
protected static class SafeRegion implements AutoCloseable {
final ReentrantLock lock = new ReentrantLock();
DownloadTaskState(DownloadState state) {
urls = new ArrayList<>(state.urls);
segments = new ArrayList<>(state.segments);
threads = IntStream.range(0, state.segments.size()).mapToObj(x -> (Thread) null).collect(Collectors.toList());
void begin() {
lock.lock();
}
DownloadTaskState(List<String> urls, int contentLength, int initialParts) {
void end() {
lock.unlock();
}
@Override
public void close() {
end();
}
}
protected static abstract class DownloadTask<T> extends CompletableFutureTask<T> {
protected final DownloadTaskState state;
protected boolean caching = false;
protected CacheRepository repository = CacheRepository.getInstance();
private final CompletableFuture<T> future = new CompletableFuture<>();
private EnumCheckETag checkETag;
public DownloadTask(DownloadTaskState state) {
this.state = state;
}
public final void setCaching(boolean caching) {
this.caching = caching;
}
public final void setCacheRepository(CacheRepository repository) {
this.repository = repository;
}
public final DownloadTaskState getDownloadState() {
return state;
}
protected abstract void write(byte[] buffer, int offset, int len) throws IOException;
protected EnumCheckETag shouldCheckETag() {
return EnumCheckETag.NOT_CHECK_E_TAG;
}
protected final EnumCheckETag getCheckETag() { return checkETag; }
protected void onBeforeConnection(URL url) {}
protected abstract void onStart() throws IOException;
/**
* Make cached file as result of this task.
*
* @param cachedFile verified cached file
* @throws IOException if an I/O error occurred.
*/
protected void finishWithCachedResult(Path cachedFile) throws IOException {
state.finished = true;
future.complete(getResult());
}
public void finish() throws IOException {
state.finished = true;
future.complete(getResult());
}
@Override
public final CompletableFuture<T> getCompletableFuture() {
return CompletableFuture.runAsync(() -> {
checkETag = shouldCheckETag();
for (Runnable runnable : state.threads)
download().submit(runnable);
}).thenCompose(unused -> future);
}
protected enum EnumCheckETag {
CHECK_E_TAG,
NOT_CHECK_E_TAG,
CACHED
}
}
protected static final class DownloadTaskState {
private final List<URL> urls;
private final Path file;
private final List<DownloadSegment> segments;
private final List<Runnable> threads;
private URL fastestUrl;
private final int retry;
private int retryUrl = 0;
private boolean cancelled = false;
private boolean finished = false;
private int contentLength;
private final int initialParts;
private final SafeRegion connectionCheckRegion = new SafeRegion();
private final SafeRegion writeRegion = new SafeRegion();
DownloadTaskState(DownloadState state, Path file, int retry) {
this.urls = new ArrayList<>(state.urls);
this.file = file;
this.retry = retry;
this.segments = new ArrayList<>(state.segments);
this.threads = IntStream.range(0, state.segments.size()).mapToObj(x -> (Thread) null).collect(Collectors.toList());
this.contentLength = state.getContentLength();
this.initialParts = state.getSegments().size();
}
DownloadTaskState(List<URL> urls, Path file, int retry, int contentLength, int initialParts) {
if (urls == null || urls.size() == 0) {
throw new IllegalArgumentException("DownloadTaskState requires at least one url candidate");
}
this.urls = new ArrayList<>(urls);
segments = new ArrayList<>(initialParts);
threads = new ArrayList<>(initialParts);
this.file = file;
this.retry = retry;
this.initialParts = initialParts;
this.segments = new ArrayList<>(initialParts);
this.threads = new ArrayList<>(initialParts);
int partLength = contentLength / initialParts;
for (int i = 0; i < initialParts; i++) {
int begin = partLength * i;
@ -70,22 +254,64 @@ class DownloadManager {
}
}
public static DownloadTaskState newWithLengthUnknown(List<String> urls, int initialParts) {
return new DownloadTaskState(urls, 0, initialParts);
public static DownloadTaskState newWithLengthUnknown(List<URL> urls, Path file, int retry, int initialParts) {
return new DownloadTaskState(urls, file, retry, 0, initialParts);
}
public List<String> getUrls() {
public synchronized List<URL> getUrls() {
return urls;
}
public Path getFile() {
return file;
}
public Path getDownloadingFile() {
return file.resolveSibling(FileUtils.getName(file) + ".download");
}
public Path getStateFile() {
return file.resolveSibling(FileUtils.getName(file) + ".status");
}
public List<DownloadSegment> getSegments() {
return segments;
}
public String getFirstUrl() {
protected synchronized void setContentLength(int contentLength) {
if (this.contentLength != 0) {
throw new IllegalStateException("ContentLength already set");
}
this.contentLength = contentLength;
if (contentLength < 0) {
return;
}
int partLength = contentLength / initialParts;
for (int i = 0; i < segments.size(); i++) {
int begin = partLength * i;
int end = Math.min((partLength + 1) * i, contentLength);
segments.get(i).setDownloadRange(begin, end);
}
}
public synchronized int getContentLength() {
return contentLength;
}
public synchronized URL getFirstUrl() {
return urls.get(0);
}
public synchronized boolean isFinished() {
return finished;
}
protected synchronized void setFinished(boolean finished) {
this.finished = finished;
}
/**
* Next url for download runnable to retry.
*
@ -93,11 +319,16 @@ class DownloadManager {
* to acquire next url for retry. Making all download runnable try different
* candidates concurrently to speed up finding fastest download source.
*
* If all URLs are tried and tested definitely negative for downloading,
* returns null.
*
* @return next url to retry
*/
public synchronized String getNextUrlToRetry() {
String url = urls.get(retry);
retry = (retry + 1) % urls.size();
@Nullable
public synchronized URL getNextUrlToRetry() {
if (retryUrl < 0 || retryUrl >= urls.size()) return null;
URL url = urls.get(retryUrl);
retryUrl = (retryUrl + 1) % urls.size();
return url;
}
@ -112,11 +343,11 @@ class DownloadManager {
*
* @return fastest url, null if no url have successfully connected yet.
*/
public synchronized String getFastestUrl() {
public synchronized URL getFastestUrl() {
return fastestUrl;
}
public synchronized void setFastestUrl(String fastestUrl) {
public synchronized void setFastestUrl(URL fastestUrl) {
this.fastestUrl = fastestUrl;
}
@ -127,37 +358,64 @@ class DownloadManager {
public synchronized boolean isCancelled() {
return cancelled;
}
public SafeRegion checkingConnection() {
connectionCheckRegion.begin();
return connectionCheckRegion;
}
public SafeRegion writing() {
writeRegion.begin();
return writeRegion;
}
public synchronized void forbidURL(URL url) {
int index;
while ((index = urls.indexOf(url)) != -1) {
if (retryUrl >= index) {
retryUrl--;
}
urls.remove(index);
}
}
}
protected static class DownloadState {
private final List<String> urls;
protected static final class DownloadState {
private final List<URL> urls;
private final List<DownloadSegment> segments;
private final int contentLength;
/**
* Constructor for Gson
*/
public DownloadState() {
this(Collections.emptyList(), Collections.emptyList());
this(Collections.emptyList(), Collections.emptyList(), 0);
}
public DownloadState(List<String> urls, List<DownloadSegment> segments) {
public DownloadState(List<URL> urls, List<DownloadSegment> segments, int contentLength) {
this.urls = urls;
this.segments = segments;
this.contentLength = contentLength;
}
public List<String> getUrls() {
public List<URL> getUrls() {
return urls;
}
public List<DownloadSegment> getSegments() {
return segments;
}
public int getContentLength() {
return contentLength;
}
}
protected static class DownloadSegment {
private final int startPosition;
private final int endPosition;
private int currentPosition;
protected static final class DownloadSegment {
private int startPosition;
private int endPosition;
private int downloaded;
private URLConnection connection;
/**
* Constructor for Gson
@ -166,13 +424,13 @@ class DownloadManager {
this(0, 0, 0);
}
public DownloadSegment(int startPosition, int endPosition, int currentPosition) {
if (currentPosition < startPosition || currentPosition > endPosition) {
throw new IllegalArgumentException("Illegal download state: start " + startPosition + ", end " + endPosition + ", cur " + currentPosition);
public DownloadSegment(int startPosition, int endPosition, int downloaded) {
if (downloaded > endPosition - startPosition) {
throw new IllegalArgumentException("Illegal download state: start " + startPosition + ", end " + endPosition + ", total downloaded " + downloaded);
}
this.startPosition = startPosition;
this.endPosition = endPosition;
this.currentPosition = currentPosition;
this.downloaded = downloaded;
}
public int getStartPosition() {
@ -183,19 +441,117 @@ class DownloadManager {
return endPosition;
}
public int getCurrentPosition() {
return currentPosition;
public void setDownloadRange(int start, int end) {
this.startPosition = start;
this.endPosition = end;
this.downloaded = 0;
}
public void setCurrentPosition(int currentPosition) {
this.currentPosition = currentPosition;
public int getDownloaded() {
return downloaded;
}
public void setDownloaded(int downloaded) {
this.downloaded = downloaded;
}
public int getLength() {
return endPosition - startPosition;
}
public URLConnection getConnection() {
return connection;
}
protected void setConnection(URLConnection connection) {
this.connection = connection;
}
public boolean isFinished() {
return currentPosition == endPosition;
return downloaded == getLength();
}
public boolean isWaiting() { return startPosition == endPosition && startPosition == 0; }
}
private static final Timer timer = new Timer("DownloadSpeedRecorder", true);
private static final AtomicInteger downloadSpeed = new AtomicInteger(0);
public static final EventBus speedEvent = new EventBus();
static {
timer.schedule(new TimerTask() {
@Override
public void run() {
speedEvent.channel(SpeedEvent.class).fireEvent(new SpeedEvent(speedEvent, downloadSpeed.getAndSet(0)));
}
}, 0, 1000);
}
public static void updateDownloadSpeed(int speed) {
downloadSpeed.addAndGet(speed);
}
public static class SpeedEvent extends Event {
private final int speed;
public SpeedEvent(Object source, int speed) {
super(source);
this.speed = speed;
}
/**
* Download speed in byte/sec.
* @return download speed
*/
public int getSpeed() {
return speed;
}
@Override
public String toString() {
return new ToStringBuilder(this).append("speed", speed).toString();
}
}
private static int downloadExecutorConcurrency = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);
private static volatile ExecutorService DOWNLOAD_EXECUTOR;
/**
* Get singleton instance of the thread pool for file downloading.
*
* @return Thread pool for FetchTask
*/
protected static ExecutorService download() {
if (DOWNLOAD_EXECUTOR == null) {
synchronized (Schedulers.class) {
if (DOWNLOAD_EXECUTOR == null) {
DOWNLOAD_EXECUTOR = new ThreadPoolExecutor(0, downloadExecutorConcurrency, 10, TimeUnit.SECONDS, new SynchronousQueue<>(),
runnable -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
return thread;
});
}
}
}
return DOWNLOAD_EXECUTOR;
}
public static void setDownloadExecutorConcurrency(int concurrency) {
synchronized (Schedulers.class) {
downloadExecutorConcurrency = concurrency;
if (DOWNLOAD_EXECUTOR != null) {
DOWNLOAD_EXECUTOR.shutdownNow();
DOWNLOAD_EXECUTOR = null;
}
}
}
public static int getDownloadExecutorConcurrency() {
synchronized (Schedulers.class) {
return downloadExecutorConcurrency;
}
}
}

View File

@ -0,0 +1,186 @@
package org.jackhuang.hmcl.task;
import org.jackhuang.hmcl.util.Logging;
import org.jackhuang.hmcl.util.io.IOUtils;
import org.jackhuang.hmcl.util.io.NetworkUtils;
import org.jackhuang.hmcl.util.io.ResponseCodeException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Path;
import java.util.logging.Level;
class DownloadSegmentTask {
private final DownloadManager.DownloadTask<?> task;
private final DownloadManager.DownloadTaskState state;
private final RandomAccessFile file;
private int tryTime = 0;
private URLConnection conn;
private URL lastURL;
private DownloadManager.DownloadSegment segment;
public DownloadSegmentTask(DownloadManager.DownloadTask<?> task, RandomAccessFile file, DownloadManager.DownloadSegment segment) {
this.task = task;
this.state = task.getDownloadState();
this.file = file;
this.segment = segment;
}
private URLConnection createConnection(boolean retryLastConnection) throws IOException {
if (retryLastConnection && lastURL != null) {
return NetworkUtils.createConnection(lastURL, 4000);
}
// 1. try connection given by DownloadTask
if (this.conn != null) {
URLConnection conn = this.conn;
lastURL = conn.getURL();
this.conn = null;
return conn;
}
// 2. try suggested URL at the first time
if (tryTime == 0) {
lastURL = state.getFirstUrl();
return NetworkUtils.createConnection(lastURL, 4000);
}
// 3. try fastest URL if measured
URL fastestURL = state.getFastestUrl();
if (fastestURL != null) {
lastURL = fastestURL;
return NetworkUtils.createConnection(lastURL, 4000);
}
// 4. try other URL, DownloadTaskState will make all DownloadSegmentTask
// try different URL to speed up connection.
URL nextURLToTry = state.getNextUrlToRetry();
if (nextURLToTry == null) {
return null;
}
tryTime++;
lastURL = nextURLToTry;
return NetworkUtils.createConnection(lastURL, 4000);
}
public void run() throws DownloadException {
Exception exception = null;
URL failedURL = null;
boolean checkETag;
switch (task.getCheckETag()) {
case CHECK_E_TAG:
checkETag = true;
break;
case NOT_CHECK_E_TAG:
checkETag = false;
break;
default:
return;
}
boolean retryLastConnection = false;
while (true) {
if (state.isCancelled() || state.isFinished()) {
break;
}
try {
URLConnection conn = createConnection(retryLastConnection);
if (conn == null) {
break;
}
if (checkETag) task.repository.injectConnection(conn);
task.onBeforeConnection(lastURL);
segment.setDownloaded(0);
try (DownloadManager.SafeRegion region = state.checkingConnection()) {
// If other DownloadSegmentTask finishedWithCachedResult
// then this task should stop.
if (state.isFinished()) {
return;
}
if (state.getContentLength() == 0) {
state.setContentLength(conn.getContentLength());
}
// TODO: reset connection with range
if (conn instanceof HttpURLConnection) {
conn = NetworkUtils.resolveConnection((HttpURLConnection) conn);
int responseCode = ((HttpURLConnection) conn).getResponseCode();
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
// Handle cache
try {
Path cache = task.repository.getCachedRemoteFile(conn);
task.finishWithCachedResult(cache);
return;
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Unable to use cached file, re-download " + lastURL, e);
task.repository.removeRemoteEntry(conn);
// Now we must reconnect the server since 304 may result in empty content,
// if we want to re-download the file, we must reconnect the server without etag settings.
retryLastConnection = true;
continue;
}
} else if (responseCode / 100 == 4) {
state.forbidURL(lastURL);
break; // we will not try this URL again
} else if (responseCode / 100 != 2) {
throw new ResponseCodeException(lastURL, responseCode);
}
}
}
try (InputStream stream = conn.getInputStream()) {
int lastDownloaded = 0, downloaded = 0;
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (state.isCancelled()) break;
int len = stream.read(buffer);
if (len == -1) break;
try (DownloadManager.SafeRegion region = state.writing()) {
task.write(buffer, 0, len);
}
downloaded += len;
if (conn.getContentLength() >= 0) {
// Update progress information per second
segment.setDownloaded(downloaded);
}
DownloadManager.updateDownloadSpeed(downloaded - lastDownloaded);
lastDownloaded = downloaded;
}
if (state.isCancelled()) break;
if (conn.getContentLength() >= 0 && !segment.isFinished())
throw new IOException("Unexpected segment size: " + downloaded + ", expected: " + segment.getLength());
}
segment.setConnection(conn);
return;
} catch (IOException ex) {
failedURL = lastURL;
exception = ex;
Logging.LOG.log(Level.WARNING, "Failed to download " + failedURL + ", repeat times: " + tryTime, ex);
}
}
if (exception != null)
throw new DownloadException(failedURL, exception);
}
}

View File

@ -1,156 +0,0 @@
package org.jackhuang.hmcl.task;
import org.jackhuang.hmcl.util.CacheRepository;
import org.jackhuang.hmcl.util.Logging;
import org.jackhuang.hmcl.util.io.IOUtils;
import org.jackhuang.hmcl.util.io.NetworkUtils;
import org.jackhuang.hmcl.util.io.ResponseCodeException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Path;
import java.util.logging.Level;
abstract class DownloadTask implements Runnable {
private final DownloadManager.DownloadTaskState state;
private final RandomAccessFile file;
private URLConnection conn;
private DownloadManager.DownloadSegment segment;
protected boolean caching;
protected CacheRepository repository = CacheRepository.getInstance();
public DownloadTask(DownloadManager.DownloadTaskState state, RandomAccessFile file, DownloadManager.DownloadSegment segment) {
this.state = state;
this.file = file;
this.segment = segment;
}
public void setCaching(boolean caching) {
this.caching = caching;
}
public void setCacheRepository(CacheRepository repository) {
this.repository = repository;
}
protected void beforeDownload(URL url) throws IOException {
}
protected abstract void useCachedResult(Path cachedFile) throws IOException;
protected abstract FetchTask.EnumCheckETag shouldCheckETag();
protected abstract FetchTask.Context getContext(URLConnection conn, boolean checkETag) throws IOException;
@Override
public void run() {
Exception exception = null;
URL failedURL = null;
boolean checkETag;
switch (shouldCheckETag()) {
case CHECK_E_TAG:
checkETag = true;
break;
case NOT_CHECK_E_TAG:
checkETag = false;
break;
default:
return;
}
int repeat = 0;
while (true) {
if (state.isCancelled()) {
break;
}
String url = repeat == 0 ? state.getFirstUrl() : state.getNextUrlToRetry();
repeat++;
if (url == null) {
break;
}
try {
beforeDownload(url);
updateProgress(0);
URLConnection conn = NetworkUtils.createConnection(NetworkUtils.toURL(url);
if (checkETag) repository.injectConnection(conn);
if (conn instanceof HttpURLConnection) {
conn = NetworkUtils.resolveConnection((HttpURLConnection) conn);
int responseCode = ((HttpURLConnection) conn).getResponseCode();
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
// Handle cache
try {
Path cache = repository.getCachedRemoteFile(conn);
useCachedResult(cache);
return;
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Unable to use cached file, redownload " + url, e);
repository.removeRemoteEntry(conn);
// Now we must reconnect the server since 304 may result in empty content,
// if we want to redownload the file, we must reconnect the server without etag settings.
repeat--;
continue;
}
} else if (responseCode / 100 == 4) {
break; // we will not try this URL again
} else if (responseCode / 100 != 2) {
throw new ResponseCodeException(url, responseCode);
}
}
long contentLength = conn.getContentLength();
try (FetchTask.Context context = getContext(conn, checkETag); InputStream stream = conn.getInputStream()) {
int lastDownloaded = 0, downloaded = 0;
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (state.isCancelled()) break;
int len = stream.read(buffer);
if (len == -1) break;
context.write(buffer, 0, len);
downloaded += len;
if (contentLength >= 0) {
// Update progress information per second
updateProgress(downloaded, contentLength);
}
updateDownloadSpeed(downloaded - lastDownloaded);
lastDownloaded = downloaded;
}
if (state.isCancelled()) break;
updateDownloadSpeed(downloaded - lastDownloaded);
if (contentLength >= 0 && downloaded != contentLength)
throw new IOException("Unexpected file size: " + downloaded + ", expected: " + contentLength);
context.withResult(true);
}
return;
} catch (IOException ex) {
failedURL = url;
exception = ex;
Logging.LOG.log(Level.WARNING, "Failed to download " + url + ", repeat times: " + repeat, ex);
}
}
if (exception != null)
throw new DownloadException(failedURL, exception);
}
}

View File

@ -1,276 +0,0 @@
/*
* Hello Minecraft! Launcher
* Copyright (C) 2020 huangyuhui <huanghongxun2008@126.com> and contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jackhuang.hmcl.task;
import org.jackhuang.hmcl.event.Event;
import org.jackhuang.hmcl.event.EventBus;
import org.jackhuang.hmcl.util.CacheRepository;
import org.jackhuang.hmcl.util.Logging;
import org.jackhuang.hmcl.util.ToStringBuilder;
import org.jackhuang.hmcl.util.io.IOUtils;
import org.jackhuang.hmcl.util.io.NetworkUtils;
import org.jackhuang.hmcl.util.io.ResponseCodeException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
public abstract class FetchTask<T> extends Task<T> {
protected final List<URL> urls;
protected final int retry;
protected boolean caching;
protected CacheRepository repository = CacheRepository.getInstance();
public FetchTask(List<URL> urls, int retry) {
if (urls == null || urls.isEmpty())
throw new IllegalArgumentException("At least one URL is required");
this.urls = new ArrayList<>(urls);
this.retry = retry;
setExecutor(Schedulers.io());
}
public void setCaching(boolean caching) {
this.caching = caching;
}
public void setCacheRepository(CacheRepository repository) {
this.repository = repository;
}
protected void beforeDownload(URL url) throws IOException {}
protected abstract void useCachedResult(Path cachedFile) throws IOException;
protected abstract EnumCheckETag shouldCheckETag();
protected abstract Context getContext(URLConnection conn, boolean checkETag) throws IOException;
@Override
public void execute() throws Exception {
Exception exception = null;
URL failedURL = null;
boolean checkETag;
switch (shouldCheckETag()) {
case CHECK_E_TAG: checkETag = true; break;
case NOT_CHECK_E_TAG: checkETag = false; break;
default: return;
}
int repeat = 0;
download: for (URL url : urls) {
for (int retryTime = 0; retryTime < retry; retryTime++) {
if (isCancelled()) {
break download;
}
try {
beforeDownload(url);
updateProgress(0);
URLConnection conn = NetworkUtils.createConnection(url);
if (checkETag) repository.injectConnection(conn);
if (conn instanceof HttpURLConnection) {
conn = NetworkUtils.resolveConnection((HttpURLConnection) conn);
int responseCode = ((HttpURLConnection) conn).getResponseCode();
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
// Handle cache
try {
Path cache = repository.getCachedRemoteFile(conn);
useCachedResult(cache);
return;
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Unable to use cached file, redownload " + url, e);
repository.removeRemoteEntry(conn);
// Now we must reconnect the server since 304 may result in empty content,
// if we want to redownload the file, we must reconnect the server without etag settings.
retryTime--;
continue;
}
} else if (responseCode / 100 == 4) {
break; // we will not try this URL again
} else if (responseCode / 100 != 2) {
throw new ResponseCodeException(url, responseCode);
}
}
long contentLength = conn.getContentLength();
try (Context context = getContext(conn, checkETag); InputStream stream = conn.getInputStream()) {
int lastDownloaded = 0, downloaded = 0;
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (isCancelled()) break;
int len = stream.read(buffer);
if (len == -1) break;
context.write(buffer, 0, len);
downloaded += len;
if (contentLength >= 0) {
// Update progress information per second
updateProgress(downloaded, contentLength);
}
updateDownloadSpeed(downloaded - lastDownloaded);
lastDownloaded = downloaded;
}
if (isCancelled()) break download;
updateDownloadSpeed(downloaded - lastDownloaded);
if (contentLength >= 0 && downloaded != contentLength)
throw new IOException("Unexpected file size: " + downloaded + ", expected: " + contentLength);
context.withResult(true);
}
return;
} catch (IOException ex) {
failedURL = url;
exception = ex;
Logging.LOG.log(Level.WARNING, "Failed to download " + url + ", repeat times: " + (++repeat), ex);
}
}
}
if (exception != null)
throw new DownloadException(failedURL, exception);
}
private static final Timer timer = new Timer("DownloadSpeedRecorder", true);
private static final AtomicInteger downloadSpeed = new AtomicInteger(0);
public static final EventBus speedEvent = new EventBus();
static {
timer.schedule(new TimerTask() {
@Override
public void run() {
speedEvent.channel(SpeedEvent.class).fireEvent(new SpeedEvent(speedEvent, downloadSpeed.getAndSet(0)));
}
}, 0, 1000);
}
private static void updateDownloadSpeed(int speed) {
downloadSpeed.addAndGet(speed);
}
public static class SpeedEvent extends Event {
private final int speed;
public SpeedEvent(Object source, int speed) {
super(source);
this.speed = speed;
}
/**
* Download speed in byte/sec.
* @return download speed
*/
public int getSpeed() {
return speed;
}
@Override
public String toString() {
return new ToStringBuilder(this).append("speed", speed).toString();
}
}
protected static abstract class Context implements Closeable {
private boolean success;
public abstract void write(byte[] buffer, int offset, int len) throws IOException;
public final void withResult(boolean success) {
this.success = success;
}
protected boolean isSuccess() {
return success;
}
}
protected enum EnumCheckETag {
CHECK_E_TAG,
NOT_CHECK_E_TAG,
CACHED
}
protected class DownloadMission {
}
private static int downloadExecutorConcurrency = Math.min(Runtime.getRuntime().availableProcessors() * 4, 64);
private static volatile ExecutorService DOWNLOAD_EXECUTOR;
/**
* Get singleton instance of the thread pool for file downloading.
*
* @return Thread pool for FetchTask
*/
protected static ExecutorService download() {
if (DOWNLOAD_EXECUTOR == null) {
synchronized (Schedulers.class) {
if (DOWNLOAD_EXECUTOR == null) {
DOWNLOAD_EXECUTOR = new ThreadPoolExecutor(0, downloadExecutorConcurrency, 10, TimeUnit.SECONDS, new SynchronousQueue<>(),
runnable -> {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
return thread;
});
}
}
}
return DOWNLOAD_EXECUTOR;
}
public static void setDownloadExecutorConcurrency(int concurrency) {
synchronized (Schedulers.class) {
downloadExecutorConcurrency = concurrency;
if (DOWNLOAD_EXECUTOR != null) {
DOWNLOAD_EXECUTOR.shutdownNow();
DOWNLOAD_EXECUTOR = null;
}
}
}
public static int getDownloadExecutorConcurrency() {
synchronized (Schedulers.class) {
return downloadExecutorConcurrency;
}
}
}

View File

@ -22,18 +22,17 @@ import org.jackhuang.hmcl.util.io.ChecksumMismatchException;
import org.jackhuang.hmcl.util.io.CompressingUtils;
import org.jackhuang.hmcl.util.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.math.BigInteger;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.util.*;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.logging.Level;
import static java.util.Objects.requireNonNull;
@ -44,11 +43,11 @@ import static org.jackhuang.hmcl.util.DigestUtils.getDigest;
*
* @author huangyuhui
*/
public class FileDownloadTask extends FetchTask<Void> {
public class FileDownloadTask extends DownloadManager.DownloadTask<Void> {
public static class IntegrityCheck {
private String algorithm;
private String checksum;
private final String algorithm;
private final String checksum;
public IntegrityCheck(String algorithm, String checksum) {
this.algorithm = requireNonNull(algorithm);
@ -80,78 +79,30 @@ public class FileDownloadTask extends FetchTask<Void> {
}
}
private final File file;
private final IntegrityCheck integrityCheck;
private Path candidate;
private RandomAccessFile rFile;
private InputStream stream;
private final ArrayList<IntegrityCheckHandler> integrityCheckHandlers = new ArrayList<>();
/**
* @param url the URL of remote file.
* @param file the location that download to.
*/
public FileDownloadTask(URL url, File file) {
this(url, file, null);
public FileDownloadTask(DownloadManager.DownloadTaskState state) {
this(state, null);
}
/**
* @param url the URL of remote file.
* @param file the location that download to.
* @param integrityCheck the integrity check to perform, null if no integrity check is to be performed
*/
public FileDownloadTask(URL url, File file, IntegrityCheck integrityCheck) {
this(Collections.singletonList(url), file, integrityCheck);
}
/**
* @param url the URL of remote file.
* @param file the location that download to.
* @param integrityCheck the integrity check to perform, null if no integrity check is to be performed
* @param retry the times for retrying if downloading fails.
*/
public FileDownloadTask(URL url, File file, IntegrityCheck integrityCheck, int retry) {
this(Collections.singletonList(url), file, integrityCheck, retry);
}
/**
* Constructor.
* @param urls urls of remote file, will be attempted in order.
* @param file the location that download to.
*/
public FileDownloadTask(List<URL> urls, File file) {
this(urls, file, null);
}
/**
* Constructor.
* @param urls urls of remote file, will be attempted in order.
* @param file the location that download to.
* @param integrityCheck the integrity check to perform, null if no integrity check is to be performed
*/
public FileDownloadTask(List<URL> urls, File file, IntegrityCheck integrityCheck) {
this(urls, file, integrityCheck, 3);
}
/**
* Constructor.
* @param urls urls of remote file, will be attempted in order.
* @param file the location that download to.
* @param integrityCheck the integrity check to perform, null if no integrity check is to be performed
* @param retry the times for retrying if downloading fails.
*/
public FileDownloadTask(List<URL> urls, File file, IntegrityCheck integrityCheck, int retry) {
super(urls, retry);
this.file = file;
public FileDownloadTask(DownloadManager.DownloadTaskState state, IntegrityCheck integrityCheck) {
super(state);
this.integrityCheck = integrityCheck;
setName(file.getName());
}
public File getFile() {
return file;
setName(FileUtils.getName(state.getFile()));
}
/**
* Set candidate that the content should equal to the file to be downloaded.
*
* If candidate set and verified, the file will be taken, and download will not happen.
*
* @param candidate path to candidate
* @return this
*/
public FileDownloadTask setCandidate(Path candidate) {
this.candidate = candidate;
return this;
@ -168,8 +119,8 @@ public class FileDownloadTask extends FetchTask<Void> {
Optional<Path> cache = repository.checkExistentFile(candidate, integrityCheck.getAlgorithm(), integrityCheck.getChecksum());
if (cache.isPresent()) {
try {
FileUtils.copyFile(cache.get().toFile(), file);
Logging.LOG.log(Level.FINER, "Successfully verified file " + file + " from " + urls.get(0));
FileUtils.copyFile(cache.get(), state.getFile());
Logging.LOG.log(Level.FINER, "Successfully verified file " + state.getFile() + " from " + state.getFirstUrl());
return EnumCheckETag.CACHED;
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to copy cache files", e);
@ -182,80 +133,75 @@ public class FileDownloadTask extends FetchTask<Void> {
}
@Override
protected void beforeDownload(URL url) {
Logging.LOG.log(Level.FINER, "Downloading " + url + " to " + file);
protected void onBeforeConnection(URL url) {
Logging.LOG.log(Level.FINER, "Downloading " + url + " to " + state.getFile());
}
@Override
protected void useCachedResult(Path cache) throws IOException {
FileUtils.copyFile(cache.toFile(), file);
protected void finishWithCachedResult(Path cache) throws IOException {
FileUtils.copyFile(cache, state.getFile());
}
@Override
protected Context getContext(URLConnection conn, boolean checkETag) throws IOException {
Path temp = Files.createTempFile(null, null);
RandomAccessFile rFile = new RandomAccessFile(temp.toFile(), "rw");
protected void write(byte[] buffer, int offset, int len) throws IOException {
rFile.write(buffer, offset, len);
}
@Override
protected void onStart() throws IOException {
rFile = new RandomAccessFile(state.getDownloadingFile().toFile(), "rw");
}
@Override
public void finish() throws IOException {
MessageDigest digest = integrityCheck == null ? null : integrityCheck.createDigest();
// TODO: digest
return new Context() {
@Override
public void write(byte[] buffer, int offset, int len) throws IOException {
if (digest != null) {
digest.update(buffer, offset, len);
}
try {
rFile.close();
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to close file: " + rFile, e);
}
rFile.write(buffer, offset, len);
if (!state.isFinished()) {
try {
Files.delete(state.getDownloadingFile());
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to delete file: " + rFile, e);
}
return;
}
@Override
public void close() throws IOException {
try {
rFile.close();
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to close file: " + rFile, e);
}
for (IntegrityCheckHandler handler : integrityCheckHandlers) {
handler.checkIntegrity(state.getDownloadingFile(), state.getFile());
}
if (!isSuccess()) {
try {
Files.delete(temp);
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to delete file: " + rFile, e);
}
return;
}
Files.deleteIfExists(state.getFile());
if (!FileUtils.makeDirectory(state.getFile().toAbsolutePath().getParent().toFile()))
throw new IOException("Unable to make parent directory " + state.getFile());
for (IntegrityCheckHandler handler : integrityCheckHandlers) {
handler.checkIntegrity(temp, file.toPath());
}
try {
FileUtils.moveFile(state.getDownloadingFile().toFile(), state.getFile().toFile());
} catch (Exception e) {
throw new IOException("Unable to move temp file from " + state.getDownloadingFile() + " to " + state.getFile(), e);
}
Files.deleteIfExists(file.toPath());
if (!FileUtils.makeDirectory(file.getAbsoluteFile().getParentFile()))
throw new IOException("Unable to make parent directory " + file);
// Integrity check
if (integrityCheck != null) {
integrityCheck.performCheck(digest);
}
try {
FileUtils.moveFile(temp.toFile(), file);
} catch (Exception e) {
throw new IOException("Unable to move temp file from " + temp + " to " + file, e);
}
// Integrity check
if (integrityCheck != null) {
integrityCheck.performCheck(digest);
}
if (caching && integrityCheck != null) {
try {
repository.cacheFile(file.toPath(), integrityCheck.getAlgorithm(), integrityCheck.getChecksum());
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to cache file", e);
}
}
if (checkETag) {
repository.cacheRemoteFile(file.toPath(), conn);
}
if (caching && integrityCheck != null) {
try {
repository.cacheFile(state.getFile(), integrityCheck.getAlgorithm(), integrityCheck.getChecksum());
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Failed to cache file", e);
}
};
}
if (getCheckETag() == EnumCheckETag.CHECK_E_TAG) {
repository.cacheRemoteFile(state.getFile(), state.getSegments().get(0).getConnection());
}
}
public interface IntegrityCheckHandler {

View File

@ -21,11 +21,8 @@ import org.jackhuang.hmcl.util.io.FileUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.*;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -33,31 +30,20 @@ import static java.nio.charset.StandardCharsets.UTF_8;
*
* @author huangyuhui
*/
public final class GetTask extends FetchTask<String> {
public final class GetTask extends DownloadManager.DownloadTask<String> {
private final Charset charset;
private ByteArrayOutputStream baos;
public GetTask(URL url) {
this(url, UTF_8);
public GetTask(DownloadManager.DownloadTaskState state) {
this(state, UTF_8);
}
public GetTask(URL url, Charset charset) {
this(url, charset, 3);
}
public GetTask(URL url, Charset charset, int retry) {
this(Collections.singletonList(url), charset, retry);
}
public GetTask(List<URL> url) {
this(url, UTF_8, 3);
}
public GetTask(List<URL> urls, Charset charset, int retry) {
super(urls, retry);
public GetTask(DownloadManager.DownloadTaskState state, Charset charset) {
super(state);
this.charset = charset;
setName(urls.get(0).toString());
setName(state.getFirstUrl().toString());
}
@Override
@ -66,32 +52,34 @@ public final class GetTask extends FetchTask<String> {
}
@Override
protected void useCachedResult(Path cachedFile) throws IOException {
protected void finishWithCachedResult(Path cachedFile) throws IOException {
setResult(FileUtils.readText(cachedFile));
super.finishWithCachedResult(cachedFile);
}
@Override
protected Context getContext(URLConnection conn, boolean checkETag) {
return new Context() {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
protected void write(byte[] buffer, int offset, int len) {
baos.write(buffer, offset, len);
}
@Override
public synchronized void write(byte[] buffer, int offset, int len) {
baos.write(buffer, offset, len);
}
@Override
protected void onStart() {
baos = new ByteArrayOutputStream();
}
@Override
public void close() throws IOException {
if (!isSuccess()) return;
@Override
public void finish() throws IOException {
if (!state.isFinished()) return;
String result = baos.toString(charset.name());
setResult(result);
String result = baos.toString(charset.name());
setResult(result);
if (checkETag) {
repository.cacheText(result, conn);
}
}
};
if (getCheckETag() == EnumCheckETag.CHECK_E_TAG) {
repository.cacheText(result, state.getSegments().get(0).getConnection());
}
super.finish();
}
}

View File

@ -17,6 +17,8 @@
*/
package org.jackhuang.hmcl.util.io;
import org.jetbrains.annotations.NotNull;
import java.io.*;
import java.net.*;
import java.util.List;
@ -55,23 +57,27 @@ public final class NetworkUtils {
return sb.toString();
}
public static URLConnection createConnection(URL url) throws IOException {
public static URLConnection createConnection(@NotNull URL url) throws IOException {
return createConnection(url, 15000);
}
public static URLConnection createConnection(@NotNull URL url, int timeout) throws IOException {
URLConnection connection = url.openConnection();
connection.setUseCaches(false);
connection.setConnectTimeout(15000);
connection.setReadTimeout(15000);
connection.setConnectTimeout(timeout);
connection.setReadTimeout(timeout);
connection.setRequestProperty("Accept-Language", Locale.getDefault().toString());
return connection;
}
public static HttpURLConnection createHttpConnection(URL url) throws IOException {
public static HttpURLConnection createHttpConnection(@NotNull URL url) throws IOException {
return (HttpURLConnection) createConnection(url);
}
/**
* @see <a href="https://github.com/curl/curl/blob/3f7b1bb89f92c13e69ee51b710ac54f775aab320/lib/transfer.c#L1427-L1461">Curl</a>
* @param location the url to be URL encoded
* @return encoded URL
* @see <a href="https://github.com/curl/curl/blob/3f7b1bb89f92c13e69ee51b710ac54f775aab320/lib/transfer.c#L1427-L1461">Curl</a>
*/
public static String encodeLocation(String location) {
StringBuilder sb = new StringBuilder();
@ -98,18 +104,31 @@ public final class NetworkUtils {
/**
* This method is a work-around that aims to solve problem when "Location" in stupid server's response is not encoded.
* @see <a href="https://github.com/curl/curl/issues/473">Issue with libcurl</a>
*
* @param conn the stupid http connection.
* @return manually redirected http connection.
* @throws IOException if an I/O error occurs.
* @see <a href="https://github.com/curl/curl/issues/473">Issue with libcurl</a>
*/
public static HttpURLConnection resolveConnection(HttpURLConnection conn) throws IOException {
return resolveConnection(conn, 15000);
}
/**
* This method is a work-around that aims to solve problem when "Location" in stupid server's response is not encoded.
*
* @param conn the stupid http connection.
* @param timeout timeout in milliseconds in once connection.
* @return manually redirected http connection.
* @throws IOException if an I/O error occurs.
* @see <a href="https://github.com/curl/curl/issues/473">Issue with libcurl</a>
*/
public static HttpURLConnection resolveConnection(HttpURLConnection conn, int timeout) throws IOException {
int redirect = 0;
while (true) {
conn.setUseCaches(false);
conn.setConnectTimeout(15000);
conn.setReadTimeout(15000);
conn.setConnectTimeout(timeout);
conn.setReadTimeout(timeout);
conn.setInstanceFollowRedirects(false);
Map<String, List<String>> properties = conn.getRequestProperties();
String method = conn.getRequestMethod();