diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json b/.changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json new file mode 100644 index 00000000000..9a9915c0a67 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-773e2a6.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fixed an issue where `AsyncRequestBody.split()` did not propagate upstream errors to the in-progress chunk subscriber" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java index 221f9246a5e..11af0474492 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/NonRetryableSubAsyncRequestBody.java @@ -97,6 +97,7 @@ public int partNumber() { return partNumber; } + @Override public void error(Throwable error) { delegate.error(error); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java index 15f8b019910..3951c4ca6d7 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/RetryableSubAsyncRequestBody.java @@ -108,6 +108,11 @@ public long maxLength() { return configuration.maxLength(); } + @Override + public void error(Throwable error) { + delegate.error(error); + } + @Override public long receivedBytesLength() { return bufferedLength; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java index d4b58b0285e..82e749c14cc 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingPublisher.java @@ -250,6 +250,7 @@ public void onComplete() { @Override public void onError(Throwable t) { log.debug(() -> "Received onError()", t); + currentBody.error(t); downstreamPublisher.error(t); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java index 6d3ec1b979a..5f68b64a83c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SubAsyncRequestBody.java @@ -39,6 +39,11 @@ public interface SubAsyncRequestBody extends CloseableAsyncRequestBody { */ void complete(); + /** + * Signal that the stream has terminated with an error. No more {@link #send(ByteBuffer)} calls will be made. + */ + void error(Throwable error); + /** * The maximum length of the content this AsyncRequestBody can hold. If the upstream content length is known, this should be * the same as receivedBytesLength diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java index 87ac0b4726f..39326ac4167 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingPublisherTest.java @@ -229,6 +229,75 @@ void retryableSubAsyncRequestBodyEnabled_shouldBeAbleToResubscribe() throws Exec } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void upstreamError_shouldPropagateToCurrentBodySubscriber(boolean enableRetryableSubAsyncRequestBody) throws Exception { + RuntimeException upstreamError = new RuntimeException("upstream failure"); + AsyncRequestBody errorBody = new AsyncRequestBody() { + @Override + public Optional contentLength() { + return Optional.of(20L); + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + private int calls = 0; + + @Override + public void request(long n) { + if (calls++ == 0) { + // Send partial data, then error + s.onNext(ByteBuffer.wrap(new byte[3])); + } else { + s.onError(upstreamError); + } + } + + @Override + public void cancel() { + } + }); + } + }; + + SplittingPublisher splittingPublisher = + SplittingPublisher.builder() + .asyncRequestBody(errorBody) + .splitConfiguration(AsyncRequestBodySplitConfiguration.builder() + .chunkSizeInBytes(10L) + .bufferSizeInBytes(20L) + .build()) + .retryableSubAsyncRequestBodyEnabled(enableRetryableSubAsyncRequestBody) + .build(); + + CompletableFuture bodyError = new CompletableFuture<>(); + splittingPublisher.subscribe(requestBody -> { + requestBody.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + } + + @Override + public void onError(Throwable t) { + bodyError.complete(t); + } + + @Override + public void onComplete() { + } + }); + }); + + Throwable error = bodyError.get(5, TimeUnit.SECONDS); + assertThat(error).isEqualTo(upstreamError); + } + private static void verifySplitContent(AsyncRequestBody asyncRequestBody, int chunkSize) throws Exception { SplittingPublisher splittingPublisher = SplittingPublisher.builder() .asyncRequestBody(asyncRequestBody)