From 66a2ea07bd5e1c44f2c8c4d2ad9e3f1a60d44ffc Mon Sep 17 00:00:00 2001 From: yuhuihuang Date: Sun, 20 Sep 2020 16:23:02 +0800 Subject: [PATCH] feat: multithreaded downloading --- .../hmcl/auth/yggdrasil/YggdrasilService.java | 2 +- .../jackhuang/hmcl/task/DownloadManager.java | 75 ++++- .../hmcl/task/DownloadSegmentTask.java | 307 ++++++++++-------- .../jackhuang/hmcl/task/FileDownloadTask.java | 4 +- 4 files changed, 232 insertions(+), 156 deletions(-) diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/auth/yggdrasil/YggdrasilService.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/auth/yggdrasil/YggdrasilService.java index ed8dd1f01..d50d095d2 100644 --- a/HMCLCore/src/main/java/org/jackhuang/hmcl/auth/yggdrasil/YggdrasilService.java +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/auth/yggdrasil/YggdrasilService.java @@ -253,7 +253,7 @@ public class YggdrasilService { try { return GSON.fromJson(text, typeOfT); } catch (JsonParseException e) { - throw new ServerResponseMalformedException(e); + throw new ServerResponseMalformedException("Server response: " + text, e); } } diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadManager.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadManager.java index 73a3e4c3c..caf331238 100644 --- a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadManager.java +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadManager.java @@ -161,9 +161,10 @@ public class DownloadManager { /** * do something before connection. * + * @param segment segment * @param url currently ready URL */ - protected void onBeforeConnection(URL url) {} + protected void onBeforeConnection(DownloadSegment segment, URL url) {} /** * Setup downloading environment, creates files, etc. @@ -266,7 +267,7 @@ public class DownloadManager { } if (!segment.isFinished()) return; - max = Math.max(max, segment.startPosition + segment.downloaded); + max = Math.max(max, segment.currentPosition); } // All segments have finished downloading. @@ -274,6 +275,15 @@ public class DownloadManager { future.complete(null); } + public synchronized final void onSegmentFailed(DownloadSegment failedSegment, Throwable throwable) { + assert(state.segments.contains(failedSegment)); + failedSegment.finished = true; + + // All segments have finished downloading. + state.finished = true; + future.completeExceptionally(throwable); + } + @Override public final CompletableFuture getCompletableFuture() { return CompletableFuture.runAsync(AsyncTaskExecutor.wrap(() -> { @@ -286,7 +296,7 @@ public class DownloadManager { segment.download(this, downloader); })) .thenCompose(unused -> future) - .whenComplete((unused, exception) -> { + .whenCompleteAsync((unused, exception) -> { if (doFinish) { try { setResult(downloader.finish()); @@ -348,7 +358,7 @@ public class DownloadManager { for (int i = 0; i < initialParts; i++) { int begin = partLength * i; int end = Math.min((partLength + 1) * i, contentLength); - segments.add(new DownloadSegment(begin, end, 0)); + segments.add(new DownloadSegment(i, begin, end, 0)); } this.waitingForContentLength = contentLength == 0; } @@ -415,6 +425,10 @@ public class DownloadManager { return waitingForContentLength; } + public int getRetry() { + return retry; + } + public synchronized boolean isSegmentSupported() { return segmentSupported; } @@ -501,6 +515,10 @@ public class DownloadManager { } urls.remove(index); } + + if (url.equals(fastestUrl)) { + fastestUrl = null; + } } } @@ -536,9 +554,10 @@ public class DownloadManager { } protected static final class DownloadSegment { + private int index; private int startPosition; private int endPosition; - private int downloaded; + private int currentPosition; private boolean finished; private URLConnection connection; private Future future; @@ -547,16 +566,21 @@ public class DownloadManager { * Constructor for Gson */ public DownloadSegment() { - this(0, 0, 0); + this(0, 0, 0, 0); } - public DownloadSegment(int startPosition, int endPosition, int downloaded) { - if (downloaded > endPosition - startPosition) { - throw new IllegalArgumentException("Illegal download state: start " + startPosition + ", end " + endPosition + ", total downloaded " + downloaded); + public DownloadSegment(int index, int startPosition, int endPosition, int currentPosition) { + if (startPosition > endPosition) { + throw new IllegalArgumentException("Illegal download state: start " + startPosition + ", end " + endPosition + ", current " + currentPosition); } + this.index = index; this.startPosition = startPosition; this.endPosition = endPosition; - this.downloaded = downloaded; + this.currentPosition = currentPosition; + } + + public int getIndex() { + return index; } public int getStartPosition() { @@ -570,19 +594,27 @@ public class DownloadManager { public void setDownloadRange(int start, int end) { this.startPosition = start; this.endPosition = end; - this.downloaded = 0; + this.currentPosition = start; } - public int getDownloaded() { - return downloaded; + /** + * Get current downloaded position + * + * CurrentPosition may be less than startPosition or larger than endPosition + * when segment unsupported. + * + * @return + */ + public int getCurrentPosition() { + return currentPosition; } public void setDownloaded() { - this.downloaded = endPosition - startPosition; + this.currentPosition = endPosition - startPosition; } - public void setDownloaded(int downloaded) { - this.downloaded = downloaded; + public void setCurrentPosition(int currentPosition) { + this.currentPosition = currentPosition; } public int getLength() { @@ -598,7 +630,7 @@ public class DownloadManager { } public boolean isFinished() { - return finished || downloaded >= getLength(); + return finished || currentPosition >= endPosition; } public Future download(DownloadTask task, Downloader downloader) { @@ -607,6 +639,15 @@ public class DownloadManager { } return future; } + + @Override + public String toString() { + return "DownloadSegment{" + + "startPosition=" + startPosition + + ", endPosition=" + endPosition + + ", hash=" + hashCode() + + '}'; + } } private static final Timer timer = new Timer("DownloadSpeedRecorder", true); diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadSegmentTask.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadSegmentTask.java index 9a75724dd..c08688ba2 100644 --- a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadSegmentTask.java +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/DownloadSegmentTask.java @@ -32,7 +32,9 @@ import java.nio.file.Path; import java.util.concurrent.Callable; import java.util.logging.Level; -class DownloadSegmentTask implements Callable { +import static org.jackhuang.hmcl.util.Logging.LOG; + +class DownloadSegmentTask implements Runnable { private final DownloadManager.DownloadTask task; private final DownloadManager.Downloader downloader; @@ -48,33 +50,48 @@ class DownloadSegmentTask implements Callable { this.segment = segment; } + private URLConnection createConnection(URL url, int startPosition, int endPosition) throws IOException { + URLConnection conn = NetworkUtils.createConnection(url, 4000); + if (startPosition != endPosition) { + conn.setRequestProperty("Range", "bytes=" + startPosition + "-" + (endPosition - 1)); + } + return conn; + } + private URLConnection createConnection(boolean retryLastConnection, int startPosition, int endPosition) throws IOException { if (retryLastConnection && lastURL != null) { - return NetworkUtils.createConnection(lastURL, 4000); + return createConnection(lastURL, startPosition, endPosition); } // 1. If we don't know content length now, DownloadSegmentTasks should try // different candidates. - if (state.isWaitingForContentLength()) { + // Ensure first segment always try url with highest priority first. + if (state.isWaitingForContentLength() && segment.getIndex() != 0) { URL nextUrlToRetry = state.getNextUrlToRetry(); if (nextUrlToRetry == null) { return null; } lastURL = nextUrlToRetry; - return NetworkUtils.createConnection(lastURL, 4000); + tryTime++; + return createConnection(lastURL, startPosition, endPosition); } // 2. try suggested URL at the first time if (tryTime == 0) { lastURL = state.getFirstUrl(); - return NetworkUtils.createConnection(lastURL, 4000); + tryTime++; + return createConnection(lastURL, startPosition, endPosition); } // 3. try fastest URL if measured URL fastestURL = state.getFastestUrl(); if (fastestURL != null) { lastURL = fastestURL; - return NetworkUtils.createConnection(lastURL, 4000); + return createConnection(lastURL, startPosition, endPosition); + } + + if (tryTime >= state.getRetry()) { + return null; } // 4. try other URL, DownloadTaskState will make all DownloadSegmentTask @@ -85,169 +102,187 @@ class DownloadSegmentTask implements Callable { } tryTime++; lastURL = nextURLToTry; - return NetworkUtils.createConnection(lastURL, 4000); + return createConnection(lastURL, startPosition, endPosition); } @Override - public Void call() throws DownloadException { - Exception exception = null; - URL failedURL = null; - boolean checkETag; - switch (task.getCheckETag()) { - case CHECK_E_TAG: - checkETag = true; - break; - case NOT_CHECK_E_TAG: - checkETag = false; - break; - default: - return null; - } - - boolean retryLastConnection = false; - while (true) { - if (state.isCancelled() || state.isFinished()) { - break; + public void run() { + try { + Exception exception = null; + URL failedURL = null; + boolean checkETag; + switch (task.getCheckETag()) { + case CHECK_E_TAG: + checkETag = true; + break; + case NOT_CHECK_E_TAG: + checkETag = false; + break; + default: + return; } - try { - boolean detectRange = state.isWaitingForContentLength(); - URLConnection conn = createConnection(retryLastConnection, segment.getStartPosition(), segment.getEndPosition()); - if (conn == null) { + boolean retryLastConnection = false; + loop: while (true) { + if (state.isCancelled() || state.isFinished()) { break; } - if (checkETag) task.repository.injectConnection(conn); - - downloader.onBeforeConnection(lastURL); - segment.setDownloaded(0); - - if (conn instanceof HttpURLConnection) { - conn = NetworkUtils.resolveConnection((HttpURLConnection) conn); - } - - try (DownloadManager.SafeRegion region = state.checkingConnection()) { - // If other DownloadSegmentTask finishedWithCachedResult - // then this task should stop. - if (state.isFinished()) { - return null; + try { + boolean detectRange = state.isWaitingForContentLength(); + boolean connectionSegmented = false; + URLConnection conn = createConnection(retryLastConnection, segment.getStartPosition(), segment.getEndPosition()); + if (conn == null) { + break; } + URL url = conn.getURL(); + + if (checkETag) task.repository.injectConnection(conn); + + LOG.log(Level.INFO, "URL " + url + " " + this); + downloader.onBeforeConnection(segment, lastURL); + if (conn instanceof HttpURLConnection) { - int responseCode = ((HttpURLConnection) conn).getResponseCode(); + conn = NetworkUtils.resolveConnection((HttpURLConnection) conn); + } - if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) { - // Handle cache - try { - Path cache = task.repository.getCachedRemoteFile(conn); - task.finishWithCachedResult(cache); - return null; - } catch (IOException e) { - Logging.LOG.log(Level.WARNING, "Unable to use cached file, re-download " + lastURL, e); - task.repository.removeRemoteEntry(conn); - // Now we must reconnect the server since 304 may result in empty content, - // if we want to re-download the file, we must reconnect the server without etag settings. - retryLastConnection = true; + try (DownloadManager.SafeRegion region = state.checkingConnection()) { + // If other DownloadSegmentTask finishedWithCachedResult + // then this task should stop. + if (state.isFinished()) { + return; + } + + if (conn instanceof HttpURLConnection) { + int responseCode = ((HttpURLConnection) conn).getResponseCode(); + + if (responseCode == HttpURLConnection.HTTP_NOT_MODIFIED) { + // Handle cache + try { + Path cache = task.repository.getCachedRemoteFile(conn); + task.finishWithCachedResult(cache); + return; + } catch (IOException e) { + LOG.log(Level.WARNING, "Unable to use cached file, re-download " + lastURL, e); + task.repository.removeRemoteEntry(conn); + // Now we must reconnect the server since 304 may result in empty content, + // if we want to re-download the file, we must reconnect the server without etag settings. + retryLastConnection = true; + continue; + } + } else if (responseCode / 100 == 4) { + // 404 may occurs when we hit some mirror that does not have the file, + // but other mirrors may have, so we should try other URLs. + state.forbidURL(lastURL); continue; + } else if (responseCode / 100 != 2) { + throw new ResponseCodeException(lastURL, responseCode); } - } else if (responseCode / 100 == 4) { - // 404 may occurs when we hit some mirror that does not have the file, - // but other mirrors may have, so we should try other URLs. + + // TODO: maybe some server supports partial content, other servers does not support, + // there should be a way to pick fastest server. + if (conn.getHeaderField("Content-Range") != null && responseCode == 206) { + state.setSegmentSupported(true); + connectionSegmented = true; + } + } + + if (state.getContentLength() == 0) { + task.setContentLength(conn.getContentLength()); + } + + int expectedLength = connectionSegmented ? segment.getLength() : state.getContentLength(); + if (expectedLength != conn.getContentLength()) { + // If content length is not expected, forbids this URL + LOG.warning("Content length mismatch " + segment + ", expected: " + expectedLength + ", actual: " + conn.getContentLength()); state.forbidURL(lastURL); continue; - } else if (responseCode / 100 != 2) { - throw new ResponseCodeException(lastURL, responseCode); } - // TODO: maybe some server supports partial content, other servers does not support, - // there should be a way to pick fastest server. - if (conn.getHeaderField("Range") != null && responseCode == 206) { - state.setSegmentSupported(true); + // TODO: Currently we mark first successfully connected URL as "fastest" URL. + state.setFastestUrl(conn.getURL()); + + if (!state.isSegmentSupported() && segment.getStartPosition() != 0) { + // Now we have not figured if URL supports segment downloading, + // and have successfully fetched content length. + // We should check states of non-first DownloadSegmentTasks. + // First DownloadSegmentTask will continue downloading whatever segment is supported or not. + if (detectRange) { + // If this DownloadSegmentTask detects content length, + // reconnect to same URL with header Range, detecting if segment supported. + retryLastConnection = true; + continue; + } else { + // We already tested Range and found segment not supported. + // Make only first DownloadSegmentTask continue. + task.onSegmentFinished(segment); + return; + } } } - if (state.getContentLength() == 0) { - task.setContentLength(conn.getContentLength()); - } + try (InputStream stream = conn.getInputStream()) { + int startPosition, lastPosition, position; + position = lastPosition = startPosition = connectionSegmented ? segment.getStartPosition() : 0; + segment.setCurrentPosition(startPosition); + byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE]; + while (true) { + if (state.isCancelled()) break; - if (state.getContentLength() != conn.getContentLength()) { - // If content length is not expected, forbids this URL - state.forbidURL(lastURL); - continue; - } + // For first DownloadSegmentTask, if other DownloadSegmentTask have figured out + // that segment is supported, and this segment have already be finished, + // stop downloading. + if (state.isSegmentSupported() && segment.isFinished()) { + break; + } - // TODO: Currently we mark first successfully connected URL as "fastest" URL. - state.setFastestUrl(conn.getURL()); + // If some non-first segment started downloading without segment, + // stop it. + if (state.isSegmentSupported() && position < segment.getStartPosition() && segment.getIndex() != 0) { + continue loop; + } - if (!state.isSegmentSupported() && segment.getStartPosition() != 0) { - // Now we have not figured if URL supports segment downloading, - // and have successfully fetched content length. - // We should check states of non-first DownloadSegmentTasks. - // First DownloadSegmentTask will continue downloading whatever segment is supported or not. - if (detectRange) { - // If this DownloadSegmentTask detects content length, - // reconnect to same URL with header Range, detecting if segment supported. - retryLastConnection = true; - continue; - } else { - // We already tested Range and found segment not supported. - // Make only first DownloadSegmentTask continue. - task.onSegmentFinished(segment); - return null; + int len = stream.read(buffer); + if (len == -1) break; + + try (DownloadManager.SafeRegion region = state.writing()) { + System.err.println("Write " + url + " segment " + segment + ",pos=" + position + ",len=" + len + ", segmented?=" + connectionSegmented); + downloader.write(position, buffer, 0, len); + } + + position += len; + + if (conn.getContentLength() >= 0) { + // Update progress information per second + segment.setCurrentPosition(position); + } + + DownloadManager.updateDownloadSpeed(position - lastPosition); + lastPosition = position; } - } - } - try (InputStream stream = conn.getInputStream()) { - int lastDownloaded = 0, downloaded = 0; - byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE]; - while (true) { if (state.isCancelled()) break; - // For first DownloadSegmentTask, if other DownloadSegmentTask have figured out - // that segment is supported, and this segment have already be finished, - // stop downloading. - if (state.isSegmentSupported() && segment.isFinished()) { - break; - } - - int len = stream.read(buffer); - if (len == -1) break; - - try (DownloadManager.SafeRegion region = state.writing()) { - downloader.write(segment.getStartPosition() + downloaded, buffer, 0, len); - } - - downloaded += len; - - if (conn.getContentLength() >= 0) { - // Update progress information per second - segment.setDownloaded(downloaded); - } - - DownloadManager.updateDownloadSpeed(downloaded - lastDownloaded); - lastDownloaded = downloaded; + if (conn.getContentLength() >= 0 && !segment.isFinished()) + throw new IOException("Unexpected segment size: " + (position - startPosition) + ", expected: " + segment.getLength()); } - if (state.isCancelled()) break; + segment.setConnection(conn); + task.onSegmentFinished(segment); - if (conn.getContentLength() >= 0 && !segment.isFinished()) - throw new IOException("Unexpected segment size: " + downloaded + ", expected: " + segment.getLength()); + return; + } catch (IOException ex) { + failedURL = lastURL; + exception = ex; + LOG.log(Level.WARNING, "Failed to download " + failedURL + ", repeat times: " + tryTime, ex); } - - segment.setConnection(conn); - task.onSegmentFinished(segment); - - return null; - } catch (IOException ex) { - failedURL = lastURL; - exception = ex; - Logging.LOG.log(Level.WARNING, "Failed to download " + failedURL + ", repeat times: " + tryTime, ex); } - } - if (exception != null) - throw new DownloadException(failedURL, exception); - return null; + if (exception != null) + throw new DownloadException(failedURL, exception); + } catch (Throwable t) { + task.onSegmentFailed(segment, t); + } } } diff --git a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/FileDownloadTask.java b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/FileDownloadTask.java index cc59f7048..45204d33a 100644 --- a/HMCLCore/src/main/java/org/jackhuang/hmcl/task/FileDownloadTask.java +++ b/HMCLCore/src/main/java/org/jackhuang/hmcl/task/FileDownloadTask.java @@ -204,8 +204,8 @@ public class FileDownloadTask extends DownloadManager.DownloadTask { } @Override - protected void onBeforeConnection(URL url) { - Logging.LOG.log(Level.FINER, "Downloading " + url + " to " + state.getFile()); + protected void onBeforeConnection(DownloadManager.DownloadSegment segment, URL url) { + Logging.LOG.log(Level.FINER, "Downloading segment " + segment.getStartPosition() + "~" + segment.getEndPosition() + " of " + url + " to " + state.getFile()); } @Override