Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-8858.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix a resource leak in `downloadDirectory` where cancelling the download could still allow file transfers that arrive late to recreate the destination directory. Added a guard in `DownloadDirectoryHelper` to skip file downloads after the operation is cancelled."
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r

CompletableFuture<Void> allOfFutures = new CompletableFuture<>();
AsyncBufferingSubscriber<S3Object> asyncBufferingSubscriber =
new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request,
new AsyncBufferingSubscriber<>(downloadSingleFile(returnFuture, downloadDirectoryRequest, request,
failedFileDownloads),
allOfFutures,
transferConfiguration.option(
Expand All @@ -129,11 +129,13 @@ private void doDownloadDirectory(CompletableFuture<CompletedDirectoryDownload> r
}

private Function<S3Object, CompletableFuture<?>> downloadSingleFile(
CompletableFuture<CompletedDirectoryDownload> returnFuture,
DownloadDirectoryRequest downloadDirectoryRequest,
ListObjectsV2Request listRequest,
Queue<FailedFileDownload> failedFileDownloads) {

return s3Object -> doDownloadSingleFile(downloadDirectoryRequest,
return s3Object -> doDownloadSingleFile(returnFuture,
downloadDirectoryRequest,
failedFileDownloads,
listRequest,
s3Object);
Expand All @@ -158,10 +160,17 @@ private void validatePath(Path destinationDirectory, Path targetPath, String key
}
}

private CompletableFuture<CompletedFileDownload> doDownloadSingleFile(DownloadDirectoryRequest downloadDirectoryRequest,
Collection<FailedFileDownload> failedFileDownloads,
ListObjectsV2Request listRequest,
S3Object s3Object) {
private CompletableFuture<CompletedFileDownload> doDownloadSingleFile(
CompletableFuture<CompletedDirectoryDownload> returnFuture,
DownloadDirectoryRequest downloadDirectoryRequest,
Collection<FailedFileDownload> failedFileDownloads,
ListObjectsV2Request listRequest,
S3Object s3Object) {

if (returnFuture.isDone()) {
return CompletableFutureUtils.failedFuture(
SdkClientException.create("Download was cancelled before file could be started"));
}

Path destinationPath = determineDestinationPath(downloadDirectoryRequest, listRequest, s3Object);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -55,6 +56,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.EncodingType;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -72,6 +74,7 @@
import software.amazon.awssdk.transfer.s3.model.FileDownload;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;
import software.amazon.awssdk.utils.async.SimplePublisher;

public class DownloadDirectoryHelperTest {
private static final String DIRECTORY_NAME = "test";
Expand Down Expand Up @@ -197,13 +200,21 @@ void downloadDirectory_cancel_shouldCancelAllFutures() throws Exception {
CompletableFuture<CompletedFileDownload> future2 = new CompletableFuture<>();
FileDownload fileDownload2 = newDownload(future2);

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);
AtomicInteger callCount = new AtomicInteger(0);
CountDownLatch bothStarted = new CountDownLatch(2);
when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenAnswer(invocation -> {
bothStarted.countDown();
return callCount.incrementAndGet() == 1 ? fileDownload : fileDownload2;
});

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.build());

assertThat(bothStarted.await(5, TimeUnit.SECONDS)).isTrue();

downloadDirectory.completionFuture().cancel(true);

assertThatThrownBy(() -> future.get(1, TimeUnit.SECONDS))
Expand Down Expand Up @@ -567,6 +578,34 @@ private FileDownload newFailedDownload(SdkClientException exception) {
return fileDownload2;
}

@Test
void downloadDirectory_cancelledFuture_shouldNotCreateDirectories() throws Exception {
SimplePublisher<S3Object> publisher = new SimplePublisher<>();
when(listObjectsHelper.listS3ObjectsRecursively(any(ListObjectsV2Request.class)))
.thenReturn(SdkPublisher.adapt(publisher));

DownloadDirectoryHelper helper = new DownloadDirectoryHelper(
TransferManagerConfiguration.builder().build(),
listObjectsHelper,
singleDownloadFunction);

DirectoryDownload directoryDownload = helper.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.build());

directoryDownload.completionFuture().cancel(true);

publisher.send(S3Object.builder().key("subdir/file.txt").size(100L).build());
publisher.complete();

Thread.sleep(200);

Path subdir = directory.resolve("subdir");
assertThat(Files.exists(subdir)).isFalse();
}

private FileDownload newDownload(CompletableFuture<CompletedFileDownload> future) {
return new DefaultFileDownload(future,
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
Expand Down
Loading