feat: multithreaded downloading

This commit is contained in:
yuhuihuang 2020-09-20 16:23:02 +08:00
parent 042e28216d
commit 66a2ea07bd
4 changed files with 232 additions and 156 deletions

View File

@ -253,7 +253,7 @@ public class YggdrasilService {
try {
return GSON.fromJson(text, typeOfT);
} catch (JsonParseException e) {
throw new ServerResponseMalformedException(e);
throw new ServerResponseMalformedException("Server response: " + text, e);
}
}

View File

@ -161,9 +161,10 @@ public class DownloadManager {
/**
* do something before connection.
*
* @param segment segment
* @param url currently ready URL
*/
protected void onBeforeConnection(URL url) {}
protected void onBeforeConnection(DownloadSegment segment, URL url) {}
/**
* Setup downloading environment, creates files, etc.
@ -266,7 +267,7 @@ public class DownloadManager {
}
if (!segment.isFinished())
return;
max = Math.max(max, segment.startPosition + segment.downloaded);
max = Math.max(max, segment.currentPosition);
}
// All segments have finished downloading.
@ -274,6 +275,15 @@ public class DownloadManager {
future.complete(null);
}
public synchronized final void onSegmentFailed(DownloadSegment failedSegment, Throwable throwable) {
assert(state.segments.contains(failedSegment));
failedSegment.finished = true;
// All segments have finished downloading.
state.finished = true;
future.completeExceptionally(throwable);
}
@Override
public final CompletableFuture<T> getCompletableFuture() {
return CompletableFuture.runAsync(AsyncTaskExecutor.wrap(() -> {
@ -286,7 +296,7 @@ public class DownloadManager {
segment.download(this, downloader);
}))
.thenCompose(unused -> future)
.whenComplete((unused, exception) -> {
.whenCompleteAsync((unused, exception) -> {
if (doFinish) {
try {
setResult(downloader.finish());
@ -348,7 +358,7 @@ public class DownloadManager {
for (int i = 0; i < initialParts; i++) {
int begin = partLength * i;
int end = Math.min((partLength + 1) * i, contentLength);
segments.add(new DownloadSegment(begin, end, 0));
segments.add(new DownloadSegment(i, begin, end, 0));
}
this.waitingForContentLength = contentLength == 0;
}
@ -415,6 +425,10 @@ public class DownloadManager {
return waitingForContentLength;
}
public int getRetry() {
return retry;
}
public synchronized boolean isSegmentSupported() {
return segmentSupported;
}
@ -501,6 +515,10 @@ public class DownloadManager {
}
urls.remove(index);
}
if (url.equals(fastestUrl)) {
fastestUrl = null;
}
}
}
@ -536,9 +554,10 @@ public class DownloadManager {
}
protected static final class DownloadSegment {
private int index;
private int startPosition;
private int endPosition;
private int downloaded;
private int currentPosition;
private boolean finished;
private URLConnection connection;
private Future<?> future;
@ -547,16 +566,21 @@ public class DownloadManager {
* Constructor for Gson
*/
public DownloadSegment() {
this(0, 0, 0);
this(0, 0, 0, 0);
}
public DownloadSegment(int startPosition, int endPosition, int downloaded) {
if (downloaded > endPosition - startPosition) {
throw new IllegalArgumentException("Illegal download state: start " + startPosition + ", end " + endPosition + ", total downloaded " + downloaded);
public DownloadSegment(int index, int startPosition, int endPosition, int currentPosition) {
if (startPosition > endPosition) {
throw new IllegalArgumentException("Illegal download state: start " + startPosition + ", end " + endPosition + ", current " + currentPosition);
}
this.index = index;
this.startPosition = startPosition;
this.endPosition = endPosition;
this.downloaded = downloaded;
this.currentPosition = currentPosition;
}
public int getIndex() {
return index;
}
public int getStartPosition() {
@ -570,19 +594,27 @@ public class DownloadManager {
public void setDownloadRange(int start, int end) {
this.startPosition = start;
this.endPosition = end;
this.downloaded = 0;
this.currentPosition = start;
}
public int getDownloaded() {
return downloaded;
/**
* Get current downloaded position
*
* CurrentPosition may be less than startPosition or larger than endPosition
* when segment unsupported.
*
* @return
*/
public int getCurrentPosition() {
return currentPosition;
}
public void setDownloaded() {
this.downloaded = endPosition - startPosition;
this.currentPosition = endPosition - startPosition;
}
public void setDownloaded(int downloaded) {
this.downloaded = downloaded;
public void setCurrentPosition(int currentPosition) {
this.currentPosition = currentPosition;
}
public int getLength() {
@ -598,7 +630,7 @@ public class DownloadManager {
}
public boolean isFinished() {
return finished || downloaded >= getLength();
return finished || currentPosition >= endPosition;
}
public Future<?> download(DownloadTask<?> task, Downloader<?> downloader) {
@ -607,6 +639,15 @@ public class DownloadManager {
}
return future;
}
@Override
public String toString() {
return "DownloadSegment{" +
"startPosition=" + startPosition +
", endPosition=" + endPosition +
", hash=" + hashCode() +
'}';
}
}
private static final Timer timer = new Timer("DownloadSpeedRecorder", true);

View File

@ -32,7 +32,9 @@ import java.nio.file.Path;
import java.util.concurrent.Callable;
import java.util.logging.Level;
class DownloadSegmentTask implements Callable<Void> {
import static org.jackhuang.hmcl.util.Logging.LOG;
class DownloadSegmentTask implements Runnable {
private final DownloadManager.DownloadTask<?> task;
private final DownloadManager.Downloader<?> downloader;
@ -48,33 +50,48 @@ class DownloadSegmentTask implements Callable<Void> {
this.segment = segment;
}
private URLConnection createConnection(URL url, int startPosition, int endPosition) throws IOException {
URLConnection conn = NetworkUtils.createConnection(url, 4000);
if (startPosition != endPosition) {
conn.setRequestProperty("Range", "bytes=" + startPosition + "-" + (endPosition - 1));
}
return conn;
}
private URLConnection createConnection(boolean retryLastConnection, int startPosition, int endPosition) throws IOException {
if (retryLastConnection && lastURL != null) {
return NetworkUtils.createConnection(lastURL, 4000);
return createConnection(lastURL, startPosition, endPosition);
}
// 1. If we don't know content length now, DownloadSegmentTasks should try
// different candidates.
if (state.isWaitingForContentLength()) {
// Ensure first segment always try url with highest priority first.
if (state.isWaitingForContentLength() && segment.getIndex() != 0) {
URL nextUrlToRetry = state.getNextUrlToRetry();
if (nextUrlToRetry == null) {
return null;
}
lastURL = nextUrlToRetry;
return NetworkUtils.createConnection(lastURL, 4000);
tryTime++;
return createConnection(lastURL, startPosition, endPosition);
}
// 2. try suggested URL at the first time
if (tryTime == 0) {
lastURL = state.getFirstUrl();
return NetworkUtils.createConnection(lastURL, 4000);
tryTime++;
return createConnection(lastURL, startPosition, endPosition);
}
// 3. try fastest URL if measured
URL fastestURL = state.getFastestUrl();
if (fastestURL != null) {
lastURL = fastestURL;
return NetworkUtils.createConnection(lastURL, 4000);
return createConnection(lastURL, startPosition, endPosition);
}
if (tryTime >= state.getRetry()) {
return null;
}
// 4. try other URL, DownloadTaskState will make all DownloadSegmentTask
@ -85,169 +102,187 @@ class DownloadSegmentTask implements Callable<Void> {
}
tryTime++;
lastURL = nextURLToTry;
return NetworkUtils.createConnection(lastURL, 4000);
return createConnection(lastURL, startPosition, endPosition);
}
@Override
public Void call() throws DownloadException {
Exception exception = null;
URL failedURL = null;
boolean checkETag;
switch (task.getCheckETag()) {
case CHECK_E_TAG:
checkETag = true;
break;
case NOT_CHECK_E_TAG:
checkETag = false;
break;
default:
return null;
}
boolean retryLastConnection = false;
while (true) {
if (state.isCancelled() || state.isFinished()) {
break;
public void run() {
try {
Exception exception = null;
URL failedURL = null;
boolean checkETag;
switch (task.getCheckETag()) {
case CHECK_E_TAG:
checkETag = true;
break;
case NOT_CHECK_E_TAG:
checkETag = false;
break;
default:
return;
}
try {
boolean detectRange = state.isWaitingForContentLength();
URLConnection conn = createConnection(retryLastConnection, segment.getStartPosition(), segment.getEndPosition());
if (conn == null) {
boolean retryLastConnection = false;
loop: while (true) {
if (state.isCancelled() || state.isFinished()) {
break;
}
if (checkETag) task.repository.injectConnection(conn);
downloader.onBeforeConnection(lastURL);
segment.setDownloaded(0);
if (conn instanceof HttpURLConnection) {
conn = NetworkUtils.resolveConnection((HttpURLConnection) conn);
}
try (DownloadManager.SafeRegion region = state.checkingConnection()) {
// If other DownloadSegmentTask finishedWithCachedResult
// then this task should stop.
if (state.isFinished()) {
return null;
try {
boolean detectRange = state.isWaitingForContentLength();
boolean connectionSegmented = false;
URLConnection conn = createConnection(retryLastConnection, segment.getStartPosition(), segment.getEndPosition());
if (conn == null) {
break;
}
URL url = conn.getURL();
if (checkETag) task.repository.injectConnection(conn);
LOG.log(Level.INFO, "URL " + url + " " + this);
downloader.onBeforeConnection(segment, lastURL);
if (conn instanceof HttpURLConnection) {
int responseCode = ((HttpURLConnection) conn).getResponseCode();
conn = NetworkUtils.resolveConnection((HttpURLConnection) conn);
}
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
// Handle cache
try {
Path cache = task.repository.getCachedRemoteFile(conn);
task.finishWithCachedResult(cache);
return null;
} catch (IOException e) {
Logging.LOG.log(Level.WARNING, "Unable to use cached file, re-download " + lastURL, e);
task.repository.removeRemoteEntry(conn);
// Now we must reconnect the server since 304 may result in empty content,
// if we want to re-download the file, we must reconnect the server without etag settings.
retryLastConnection = true;
try (DownloadManager.SafeRegion region = state.checkingConnection()) {
// If other DownloadSegmentTask finishedWithCachedResult
// then this task should stop.
if (state.isFinished()) {
return;
}
if (conn instanceof HttpURLConnection) {
int responseCode = ((HttpURLConnection) conn).getResponseCode();
if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
// Handle cache
try {
Path cache = task.repository.getCachedRemoteFile(conn);
task.finishWithCachedResult(cache);
return;
} catch (IOException e) {
LOG.log(Level.WARNING, "Unable to use cached file, re-download " + lastURL, e);
task.repository.removeRemoteEntry(conn);
// Now we must reconnect the server since 304 may result in empty content,
// if we want to re-download the file, we must reconnect the server without etag settings.
retryLastConnection = true;
continue;
}
} else if (responseCode / 100 == 4) {
// 404 may occurs when we hit some mirror that does not have the file,
// but other mirrors may have, so we should try other URLs.
state.forbidURL(lastURL);
continue;
} else if (responseCode / 100 != 2) {
throw new ResponseCodeException(lastURL, responseCode);
}
} else if (responseCode / 100 == 4) {
// 404 may occurs when we hit some mirror that does not have the file,
// but other mirrors may have, so we should try other URLs.
// TODO: maybe some server supports partial content, other servers does not support,
// there should be a way to pick fastest server.
if (conn.getHeaderField("Content-Range") != null && responseCode == 206) {
state.setSegmentSupported(true);
connectionSegmented = true;
}
}
if (state.getContentLength() == 0) {
task.setContentLength(conn.getContentLength());
}
int expectedLength = connectionSegmented ? segment.getLength() : state.getContentLength();
if (expectedLength != conn.getContentLength()) {
// If content length is not expected, forbids this URL
LOG.warning("Content length mismatch " + segment + ", expected: " + expectedLength + ", actual: " + conn.getContentLength());
state.forbidURL(lastURL);
continue;
} else if (responseCode / 100 != 2) {
throw new ResponseCodeException(lastURL, responseCode);
}
// TODO: maybe some server supports partial content, other servers does not support,
// there should be a way to pick fastest server.
if (conn.getHeaderField("Range") != null && responseCode == 206) {
state.setSegmentSupported(true);
// TODO: Currently we mark first successfully connected URL as "fastest" URL.
state.setFastestUrl(conn.getURL());
if (!state.isSegmentSupported() && segment.getStartPosition() != 0) {
// Now we have not figured if URL supports segment downloading,
// and have successfully fetched content length.
// We should check states of non-first DownloadSegmentTasks.
// First DownloadSegmentTask will continue downloading whatever segment is supported or not.
if (detectRange) {
// If this DownloadSegmentTask detects content length,
// reconnect to same URL with header Range, detecting if segment supported.
retryLastConnection = true;
continue;
} else {
// We already tested Range and found segment not supported.
// Make only first DownloadSegmentTask continue.
task.onSegmentFinished(segment);
return;
}
}
}
if (state.getContentLength() == 0) {
task.setContentLength(conn.getContentLength());
}
try (InputStream stream = conn.getInputStream()) {
int startPosition, lastPosition, position;
position = lastPosition = startPosition = connectionSegmented ? segment.getStartPosition() : 0;
segment.setCurrentPosition(startPosition);
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (state.isCancelled()) break;
if (state.getContentLength() != conn.getContentLength()) {
// If content length is not expected, forbids this URL
state.forbidURL(lastURL);
continue;
}
// For first DownloadSegmentTask, if other DownloadSegmentTask have figured out
// that segment is supported, and this segment have already be finished,
// stop downloading.
if (state.isSegmentSupported() && segment.isFinished()) {
break;
}
// TODO: Currently we mark first successfully connected URL as "fastest" URL.
state.setFastestUrl(conn.getURL());
// If some non-first segment started downloading without segment,
// stop it.
if (state.isSegmentSupported() && position < segment.getStartPosition() && segment.getIndex() != 0) {
continue loop;
}
if (!state.isSegmentSupported() && segment.getStartPosition() != 0) {
// Now we have not figured if URL supports segment downloading,
// and have successfully fetched content length.
// We should check states of non-first DownloadSegmentTasks.
// First DownloadSegmentTask will continue downloading whatever segment is supported or not.
if (detectRange) {
// If this DownloadSegmentTask detects content length,
// reconnect to same URL with header Range, detecting if segment supported.
retryLastConnection = true;
continue;
} else {
// We already tested Range and found segment not supported.
// Make only first DownloadSegmentTask continue.
task.onSegmentFinished(segment);
return null;
int len = stream.read(buffer);
if (len == -1) break;
try (DownloadManager.SafeRegion region = state.writing()) {
System.err.println("Write " + url + " segment " + segment + ",pos=" + position + ",len=" + len + ", segmented?=" + connectionSegmented);
downloader.write(position, buffer, 0, len);
}
position += len;
if (conn.getContentLength() >= 0) {
// Update progress information per second
segment.setCurrentPosition(position);
}
DownloadManager.updateDownloadSpeed(position - lastPosition);
lastPosition = position;
}
}
}
try (InputStream stream = conn.getInputStream()) {
int lastDownloaded = 0, downloaded = 0;
byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
while (true) {
if (state.isCancelled()) break;
// For first DownloadSegmentTask, if other DownloadSegmentTask have figured out
// that segment is supported, and this segment have already be finished,
// stop downloading.
if (state.isSegmentSupported() && segment.isFinished()) {
break;
}
int len = stream.read(buffer);
if (len == -1) break;
try (DownloadManager.SafeRegion region = state.writing()) {
downloader.write(segment.getStartPosition() + downloaded, buffer, 0, len);
}
downloaded += len;
if (conn.getContentLength() >= 0) {
// Update progress information per second
segment.setDownloaded(downloaded);
}
DownloadManager.updateDownloadSpeed(downloaded - lastDownloaded);
lastDownloaded = downloaded;
if (conn.getContentLength() >= 0 && !segment.isFinished())
throw new IOException("Unexpected segment size: " + (position - startPosition) + ", expected: " + segment.getLength());
}
if (state.isCancelled()) break;
segment.setConnection(conn);
task.onSegmentFinished(segment);
if (conn.getContentLength() >= 0 && !segment.isFinished())
throw new IOException("Unexpected segment size: " + downloaded + ", expected: " + segment.getLength());
return;
} catch (IOException ex) {
failedURL = lastURL;
exception = ex;
LOG.log(Level.WARNING, "Failed to download " + failedURL + ", repeat times: " + tryTime, ex);
}
segment.setConnection(conn);
task.onSegmentFinished(segment);
return null;
} catch (IOException ex) {
failedURL = lastURL;
exception = ex;
Logging.LOG.log(Level.WARNING, "Failed to download " + failedURL + ", repeat times: " + tryTime, ex);
}
}
if (exception != null)
throw new DownloadException(failedURL, exception);
return null;
if (exception != null)
throw new DownloadException(failedURL, exception);
} catch (Throwable t) {
task.onSegmentFailed(segment, t);
}
}
}

View File

@ -204,8 +204,8 @@ public class FileDownloadTask extends DownloadManager.DownloadTask<Void> {
}
@Override
protected void onBeforeConnection(URL url) {
Logging.LOG.log(Level.FINER, "Downloading " + url + " to " + state.getFile());
protected void onBeforeConnection(DownloadManager.DownloadSegment segment, URL url) {
Logging.LOG.log(Level.FINER, "Downloading segment " + segment.getStartPosition() + "~" + segment.getEndPosition() + " of " + url + " to " + state.getFile());
}
@Override