diff --git a/.changes/next-release/bugfix-S3TransferManager-8858.json b/.changes/next-release/bugfix-S3TransferManager-8858.json new file mode 100644 index 00000000000..2c77bf9c132 --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-8858.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fix a resource leak in `downloadDirectory` where cancelling the download could still allow file transfers that arrive late to recreate the destination directory. Added a guard in `DownloadDirectoryHelper` to skip file downloads after the operation is cancelled." +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java index 69f59473d06..3d22103c4a7 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelper.java @@ -106,7 +106,7 @@ private void doDownloadDirectory(CompletableFuture r CompletableFuture allOfFutures = new CompletableFuture<>(); AsyncBufferingSubscriber asyncBufferingSubscriber = - new AsyncBufferingSubscriber<>(downloadSingleFile(downloadDirectoryRequest, request, + new AsyncBufferingSubscriber<>(downloadSingleFile(returnFuture, downloadDirectoryRequest, request, failedFileDownloads), allOfFutures, transferConfiguration.option( @@ -129,11 +129,13 @@ private void doDownloadDirectory(CompletableFuture r } private Function> downloadSingleFile( + CompletableFuture returnFuture, DownloadDirectoryRequest downloadDirectoryRequest, ListObjectsV2Request listRequest, Queue failedFileDownloads) { - return s3Object -> doDownloadSingleFile(downloadDirectoryRequest, + return s3Object -> doDownloadSingleFile(returnFuture, + downloadDirectoryRequest, failedFileDownloads, listRequest, s3Object); @@ -158,10 +160,17 @@ private void validatePath(Path destinationDirectory, Path targetPath, String key } } - private CompletableFuture doDownloadSingleFile(DownloadDirectoryRequest downloadDirectoryRequest, - Collection failedFileDownloads, - ListObjectsV2Request listRequest, - S3Object s3Object) { + private CompletableFuture doDownloadSingleFile( + CompletableFuture returnFuture, + DownloadDirectoryRequest downloadDirectoryRequest, + Collection failedFileDownloads, + ListObjectsV2Request listRequest, + S3Object s3Object) { + + if (returnFuture.isDone()) { + return CompletableFutureUtils.failedFuture( + SdkClientException.create("Download was cancelled before file could be started")); + } Path destinationPath = determineDestinationPath(downloadDirectoryRequest, listRequest, s3Object); diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java index dc8536e8fc6..ac3bc291af1 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DownloadDirectoryHelperTest.java @@ -39,6 +39,7 @@ import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -55,6 +56,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.model.EncodingType; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -72,6 +74,7 @@ import software.amazon.awssdk.transfer.s3.model.FileDownload; import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import software.amazon.awssdk.transfer.s3.progress.TransferListener; +import software.amazon.awssdk.utils.async.SimplePublisher; public class DownloadDirectoryHelperTest { private static final String DIRECTORY_NAME = "test"; @@ -197,13 +200,21 @@ void downloadDirectory_cancel_shouldCancelAllFutures() throws Exception { CompletableFuture future2 = new CompletableFuture<>(); FileDownload fileDownload2 = newDownload(future2); - when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2); + AtomicInteger callCount = new AtomicInteger(0); + CountDownLatch bothStarted = new CountDownLatch(2); + when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenAnswer(invocation -> { + bothStarted.countDown(); + return callCount.incrementAndGet() == 1 ? fileDownload : fileDownload2; + }); DirectoryDownload downloadDirectory = downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder() .destination(directory) .bucket("bucket") .build()); + + assertThat(bothStarted.await(5, TimeUnit.SECONDS)).isTrue(); + downloadDirectory.completionFuture().cancel(true); assertThatThrownBy(() -> future.get(1, TimeUnit.SECONDS)) @@ -567,6 +578,34 @@ private FileDownload newFailedDownload(SdkClientException exception) { return fileDownload2; } + @Test + void downloadDirectory_cancelledFuture_shouldNotCreateDirectories() throws Exception { + SimplePublisher publisher = new SimplePublisher<>(); + when(listObjectsHelper.listS3ObjectsRecursively(any(ListObjectsV2Request.class))) + .thenReturn(SdkPublisher.adapt(publisher)); + + DownloadDirectoryHelper helper = new DownloadDirectoryHelper( + TransferManagerConfiguration.builder().build(), + listObjectsHelper, + singleDownloadFunction); + + DirectoryDownload directoryDownload = helper.downloadDirectory( + DownloadDirectoryRequest.builder() + .destination(directory) + .bucket("bucket") + .build()); + + directoryDownload.completionFuture().cancel(true); + + publisher.send(S3Object.builder().key("subdir/file.txt").size(100L).build()); + publisher.complete(); + + Thread.sleep(200); + + Path subdir = directory.resolve("subdir"); + assertThat(Files.exists(subdir)).isFalse(); + } + private FileDownload newDownload(CompletableFuture future) { return new DefaultFileDownload(future, new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()